17#include <unordered_set>
20#include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
21#include "hazelcast/client/proxy/PNCounterImpl.h"
22#include "hazelcast/client/spi/ClientContext.h"
23#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
24#include "hazelcast/client/proxy/flake_id_generator_impl.h"
25#include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
26#include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
27#include "hazelcast/client/client_config.h"
28#include "hazelcast/client/map/data_entry_view.h"
29#include "hazelcast/client/proxy/RingbufferImpl.h"
30#include "hazelcast/client/impl/vector_clock.h"
31#include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
32#include "hazelcast/util/Util.h"
33#include "hazelcast/client/topic/reliable_listener.h"
34#include "hazelcast/client/proxy/ITopicImpl.h"
35#include "hazelcast/client/proxy/ReplicatedMapImpl.h"
36#include "hazelcast/client/flake_id_generator.h"
37#include "hazelcast/client/reliable_topic.h"
43reliable_topic::reliable_topic(
const std::string& instance_name,
44 spi::ClientContext* context)
45 : proxy::ProxyImpl(
reliable_topic::SERVICE_NAME, instance_name, context)
47 context->get_client_execution_service().shared_from_this())
48 , executor_(execution_service_->get_user_executor())
49 , logger_(context->get_logger())
51 auto reliable_config =
52 context->get_client_config().lookup_reliable_topic_config(instance_name);
53 if (reliable_config) {
54 batch_size_ = reliable_config->get_read_batch_size();
56 batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
60 context->get_hazelcast_client_implementation()
61 ->get_distributed_object<ringbuffer>(
62 std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
68 int id = util::IOUtil::to_value<int>(registration_id);
69 auto runner = runners_map_.get(
id);
78reliable_topic::on_shutdown()
81 for (
auto& entry : runners_map_.clear()) {
82 entry.second->cancel();
87reliable_topic::on_destroy()
90 for (
auto& entry : runners_map_.clear()) {
91 entry.second->cancel();
96reliable_topic::post_destroy()
99 ringbuffer_.get()->destroy().get();
104 int64_t initial_sequence_id)
105 : loss_tolerant_(loss_tolerant)
106 , initial_sequence_id_(initial_sequence_id)
111ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator()
112 : reference_id_counter_(0)
116ClientLockReferenceIdGenerator::get_next_reference_id()
118 return ++reference_id_counter_;
123MultiMapImpl::MultiMapImpl(
const std::string& instance_name,
124 spi::ClientContext* context)
125 : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context)
129 lock_reference_id_generator_ =
130 get_context().get_lock_reference_id_generator();
134MultiMapImpl::put(
const serialization::pimpl::data& key,
135 const serialization::pimpl::data& value)
137 auto request = protocol::codec::multimap_put_encode(
138 get_name(), key, value, util::get_current_thread_id());
139 return invoke_and_get_future<bool>(request, key);
142boost::future<std::vector<serialization::pimpl::data>>
143MultiMapImpl::get_data(
const serialization::pimpl::data& key)
145 auto request = protocol::codec::multimap_get_encode(
146 get_name(), key, util::get_current_thread_id());
147 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
152MultiMapImpl::remove(
const serialization::pimpl::data& key,
153 const serialization::pimpl::data& value)
155 auto request = protocol::codec::multimap_removeentry_encode(
156 get_name(), key, value, util::get_current_thread_id());
157 return invoke_and_get_future<bool>(request, key);
160boost::future<std::vector<serialization::pimpl::data>>
161MultiMapImpl::remove_data(
const serialization::pimpl::data& key)
163 auto request = protocol::codec::multimap_remove_encode(
164 get_name(), key, util::get_current_thread_id());
165 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
169boost::future<std::vector<serialization::pimpl::data>>
170MultiMapImpl::key_set_data()
172 auto request = protocol::codec::multimap_keyset_encode(get_name());
173 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
177boost::future<std::vector<serialization::pimpl::data>>
178MultiMapImpl::values_data()
180 auto request = protocol::codec::multimap_values_encode(get_name());
181 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
185boost::future<EntryVector>
186MultiMapImpl::entry_set_data()
188 auto request = protocol::codec::multimap_entryset_encode(get_name());
189 return invoke_and_get_future<EntryVector>(request);
193MultiMapImpl::contains_key(
const serialization::pimpl::data& key)
195 auto request = protocol::codec::multimap_containskey_encode(
196 get_name(), key, util::get_current_thread_id());
197 return invoke_and_get_future<bool>(request, key);
201MultiMapImpl::contains_value(
const serialization::pimpl::data& value)
204 protocol::codec::multimap_containsvalue_encode(get_name(), value);
205 return invoke_and_get_future<bool>(request);
209MultiMapImpl::contains_entry(
const serialization::pimpl::data& key,
210 const serialization::pimpl::data& value)
212 auto request = protocol::codec::multimap_containsentry_encode(
213 get_name(), key, value, util::get_current_thread_id());
214 return invoke_and_get_future<bool>(request, key);
220 auto request = protocol::codec::multimap_size_encode(get_name());
221 return invoke_and_get_future<int>(request);
227 auto request = protocol::codec::multimap_clear_encode(get_name());
228 return to_void_future(invoke(request));
232MultiMapImpl::value_count(
const serialization::pimpl::data& key)
234 auto request = protocol::codec::multimap_valuecount_encode(
235 get_name(), key, util::get_current_thread_id());
236 return invoke_and_get_future<int>(request, key);
239boost::future<boost::uuids::uuid>
240MultiMapImpl::add_entry_listener(
241 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
244 return register_listener(
245 create_multi_map_entry_listener_codec(include_value),
246 std::move(entry_event_handler));
249boost::future<boost::uuids::uuid>
250MultiMapImpl::add_entry_listener(
251 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
253 serialization::pimpl::data&& key)
255 return register_listener(
256 create_multi_map_entry_listener_codec(include_value, std::move(key)),
257 std::move(entry_event_handler));
261MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
263 return get_context().get_client_listener_service().deregister_listener(
268MultiMapImpl::lock(
const serialization::pimpl::data& key)
270 return lock(key, std::chrono::milliseconds(-1));
274MultiMapImpl::lock(
const serialization::pimpl::data& key,
275 std::chrono::milliseconds lease_time)
277 auto request = protocol::codec::multimap_lock_encode(
280 util::get_current_thread_id(),
281 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
282 lock_reference_id_generator_->get_next_reference_id());
283 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
287MultiMapImpl::is_locked(
const serialization::pimpl::data& key)
289 auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
290 return invoke_and_get_future<bool>(request, key);
294MultiMapImpl::try_lock(
const serialization::pimpl::data& key)
296 auto request = protocol::codec::multimap_trylock_encode(
299 util::get_current_thread_id(),
302 lock_reference_id_generator_->get_next_reference_id());
303 return invoke_and_get_future<bool>(request, key);
307MultiMapImpl::try_lock(
const serialization::pimpl::data& key,
308 std::chrono::milliseconds timeout)
310 return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
314MultiMapImpl::try_lock(
const serialization::pimpl::data& key,
315 std::chrono::milliseconds timeout,
316 std::chrono::milliseconds lease_time)
318 auto request = protocol::codec::multimap_trylock_encode(
321 util::get_current_thread_id(),
322 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
323 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
324 lock_reference_id_generator_->get_next_reference_id());
325 return invoke_and_get_future<bool>(request, key);
329MultiMapImpl::unlock(
const serialization::pimpl::data& key)
331 auto request = protocol::codec::multimap_unlock_encode(
334 util::get_current_thread_id(),
335 lock_reference_id_generator_->get_next_reference_id());
336 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
340MultiMapImpl::force_unlock(
const serialization::pimpl::data& key)
342 auto request = protocol::codec::multimap_forceunlock_encode(
343 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
344 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
347std::shared_ptr<spi::impl::ListenerMessageCodec>
348MultiMapImpl::create_multi_map_entry_listener_codec(
bool include_value)
350 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
351 new MultiMapEntryListenerMessageCodec(get_name(), include_value));
354std::shared_ptr<spi::impl::ListenerMessageCodec>
355MultiMapImpl::create_multi_map_entry_listener_codec(
357 serialization::pimpl::data&& key)
359 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
360 new MultiMapEntryListenerToKeyCodec(
361 get_name(), include_value, std::move(key)));
365MultiMapImpl::on_initialize()
367 ProxyImpl::on_initialize();
368 lock_reference_id_generator_ =
369 get_context().get_lock_reference_id_generator();
372MultiMapImpl::MultiMapEntryListenerMessageCodec::
373 MultiMapEntryListenerMessageCodec(std::string name,
bool include_value)
374 : name_(std::move(name))
375 , include_value_(include_value)
378protocol::ClientMessage
379MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(
380 bool local_only)
const
382 return protocol::codec::multimap_addentrylistener_encode(
383 name_, include_value_, local_only);
386protocol::ClientMessage
387MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
388 boost::uuids::uuid real_registration_id)
const
390 return protocol::codec::multimap_removeentrylistener_encode(
391 name_, real_registration_id);
394protocol::ClientMessage
395MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(
396 bool local_only)
const
398 return protocol::codec::multimap_addentrylistenertokey_encode(
399 name_, key_, include_value_, local_only);
402protocol::ClientMessage
403MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
404 boost::uuids::uuid real_registration_id)
const
406 return protocol::codec::multimap_removeentrylistener_encode(
407 name_, real_registration_id);
410MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(
413 serialization::pimpl::data&& key)
414 : name_(std::move(name))
415 , include_value_(include_value)
416 , key_(std::move(key))
419const std::shared_ptr<std::unordered_set<member>>
420 PNCounterImpl::EMPTY_ADDRESS_LIST(
new std::unordered_set<member>());
422PNCounterImpl::PNCounterImpl(
const std::string& service_name,
423 const std::string& object_name,
424 spi::ClientContext* context)
425 : ProxyImpl(service_name, object_name, context)
426 , max_configured_replica_count_(0)
428 std::shared_ptr<impl::vector_clock>(new impl::vector_clock()))
429 , logger_(context->get_logger())
433operator<<(std::ostream& os,
const PNCounterImpl& proxy)
435 os <<
"PNCounter{name='" << proxy.get_name() <<
"\'}";
439boost::future<int64_t>
442 boost::shared_ptr<member> target =
443 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
445 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
446 "ClientPNCounterProxy::get",
447 "Cannot invoke operations on a CRDT because the cluster does not "
448 "contain any data members"));
450 return invoke_get_internal(EMPTY_ADDRESS_LIST,
nullptr, target);
453boost::future<int64_t>
454PNCounterImpl::get_and_add(int64_t delta)
456 boost::shared_ptr<member> target =
457 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
459 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
460 "ClientPNCounterProxy::getAndAdd",
461 "Cannot invoke operations on a CRDT because the cluster does not "
462 "contain any data members"));
464 return invoke_add_internal(
465 delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
468boost::future<int64_t>
469PNCounterImpl::add_and_get(int64_t delta)
471 boost::shared_ptr<member> target =
472 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
474 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
475 "ClientPNCounterProxy::addAndGet",
476 "Cannot invoke operations on a CRDT because the cluster does not "
477 "contain any data members"));
479 return invoke_add_internal(
480 delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
483boost::future<int64_t>
484PNCounterImpl::get_and_subtract(int64_t delta)
486 boost::shared_ptr<member> target =
487 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
489 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
490 "ClientPNCounterProxy::getAndSubtract",
491 "Cannot invoke operations on a CRDT because the cluster does not "
492 "contain any data members"));
494 return invoke_add_internal(
495 -delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
498boost::future<int64_t>
499PNCounterImpl::subtract_and_get(int64_t delta)
501 boost::shared_ptr<member> target =
502 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
504 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
505 "ClientPNCounterProxy::subtractAndGet",
506 "Cannot invoke operations on a CRDT because the cluster does not "
507 "contain any data members"));
509 return invoke_add_internal(
510 -delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
513boost::future<int64_t>
514PNCounterImpl::decrement_and_get()
516 boost::shared_ptr<member> target =
517 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
519 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
520 "ClientPNCounterProxy::decrementAndGet",
521 "Cannot invoke operations on a CRDT because the cluster does not "
522 "contain any data members"));
524 return invoke_add_internal(-1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
527boost::future<int64_t>
528PNCounterImpl::increment_and_get()
530 boost::shared_ptr<member> target =
531 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
533 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
534 "ClientPNCounterProxy::incrementAndGet",
535 "Cannot invoke operations on a CRDT because the cluster does not "
536 "contain any data members"));
538 return invoke_add_internal(1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
541boost::future<int64_t>
542PNCounterImpl::get_and_decrement()
544 boost::shared_ptr<member> target =
545 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
547 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
548 "ClientPNCounterProxy::getAndDecrement",
549 "Cannot invoke operations on a CRDT because the cluster does not "
550 "contain any data members"));
552 return invoke_add_internal(-1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
555boost::future<int64_t>
556PNCounterImpl::get_and_increment()
558 boost::shared_ptr<member> target =
559 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
561 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
562 "ClientPNCounterProxy::getAndIncrement",
563 "Cannot invoke operations on a CRDT because the cluster does not "
564 "contain any data members"));
566 return invoke_add_internal(1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
570PNCounterImpl::reset()
573 std::shared_ptr<impl::vector_clock>(
new impl::vector_clock());
574 return boost::make_ready_future();
577boost::shared_ptr<member>
578PNCounterImpl::get_crdt_operation_target(
579 const std::unordered_set<member>& excluded_addresses)
581 auto replicaAddress = current_target_replica_address_.load();
582 if (replicaAddress &&
583 excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
584 return replicaAddress;
588 std::lock_guard<std::mutex> guard(target_selection_mutex_);
589 replicaAddress = current_target_replica_address_.load();
590 if (!replicaAddress || excluded_addresses.find(*replicaAddress) !=
591 excluded_addresses.end()) {
592 current_target_replica_address_ =
593 choose_target_replica(excluded_addresses);
596 return current_target_replica_address_;
599boost::shared_ptr<member>
600PNCounterImpl::choose_target_replica(
601 const std::unordered_set<member>& excluded_addresses)
603 std::vector<member> replicaAddresses =
604 get_replica_addresses(excluded_addresses);
605 if (replicaAddresses.empty()) {
610 int randomReplicaIndex = std::abs(rand()) % (int)replicaAddresses.size();
611 return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
615PNCounterImpl::get_replica_addresses(
616 const std::unordered_set<member>& excluded_members)
618 std::vector<member> dataMembers =
619 get_context().get_client_cluster_service().get_members(
620 *member_selectors::DATA_MEMBER_SELECTOR);
621 int32_t replicaCount = get_max_configured_replica_count();
622 int currentReplicaCount =
623 util::min<int>(replicaCount, (
int)dataMembers.size());
625 std::vector<member> replicaMembers;
626 for (
int i = 0; i < currentReplicaCount; i++) {
627 if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
628 replicaMembers.push_back(dataMembers[i]);
631 return replicaMembers;
635PNCounterImpl::get_max_configured_replica_count()
637 if (max_configured_replica_count_ > 0) {
638 return max_configured_replica_count_;
641 protocol::codec::pncounter_getconfiguredreplicacount_encode(
643 max_configured_replica_count_ =
644 invoke_and_get_future<int32_t>(request).get();
646 return max_configured_replica_count_;
649boost::shared_ptr<member>
650PNCounterImpl::try_choose_a_new_target(
651 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
652 boost::shared_ptr<member> last_target,
653 const exception::hazelcast_& last_exception)
658 boost::str(boost::format(
659 "Exception occurred while invoking operation on target %1%, "
660 "choosing different target. Cause: %2%") %
661 last_target % last_exception));
662 if (excluded_addresses == EMPTY_ADDRESS_LIST) {
665 excluded_addresses = std::make_shared<std::unordered_set<member>>();
667 excluded_addresses->insert(*last_target);
668 return get_crdt_operation_target(*excluded_addresses);
671boost::future<int64_t>
672PNCounterImpl::invoke_get_internal(
673 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
674 std::exception_ptr last_exception,
675 const boost::shared_ptr<member>& target)
678 if (last_exception) {
679 std::rethrow_exception(last_exception);
681 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
682 "ClientPNCounterProxy::invokeGetInternal",
683 "Cannot invoke operations on a CRDT because the cluster does not "
684 "contain any data members"));
688 auto timestamps = observed_clock_.get()->entry_set();
689 auto request = protocol::codec::pncounter_get_encode(
690 get_name(), timestamps, target->get_uuid());
691 return invoke_on_member(request, target->get_uuid())
693 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
695 return get_and_update_timestamps(std::move(f));
696 }
catch (exception::hazelcast_& e) {
697 return invoke_get_internal(excluded_addresses,
698 std::current_exception(),
699 try_choose_a_new_target(
700 excluded_addresses, target, e))
704 }
catch (exception::hazelcast_& e) {
705 return invoke_get_internal(
707 std::current_exception(),
708 try_choose_a_new_target(excluded_addresses, target, e));
712boost::future<int64_t>
713PNCounterImpl::invoke_add_internal(
715 bool getBeforeUpdate,
716 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
717 std::exception_ptr last_exception,
718 const boost::shared_ptr<member>& target)
721 if (last_exception) {
722 std::rethrow_exception(last_exception);
724 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
725 "ClientPNCounterProxy::invokeGetInternal",
726 "Cannot invoke operations on a CRDT because the cluster does not "
727 "contain any data members"));
732 auto request = protocol::codec::pncounter_add_encode(
736 observed_clock_.get()->entry_set(),
738 return invoke_on_member(request, target->get_uuid())
740 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
742 return get_and_update_timestamps(std::move(f));
743 }
catch (exception::hazelcast_& e) {
744 return invoke_add_internal(delta,
747 std::current_exception(),
748 try_choose_a_new_target(
749 excluded_addresses, target, e))
753 }
catch (exception::hazelcast_& e) {
754 return invoke_add_internal(
758 std::current_exception(),
759 try_choose_a_new_target(excluded_addresses, target, e));
764PNCounterImpl::get_and_update_timestamps(
765 boost::future<protocol::ClientMessage> f)
768 auto value = msg.get_first_fixed_sized_field<int64_t>();
771 update_observed_replica_timestamps(
772 msg.get<impl::vector_clock::timestamp_vector>());
777PNCounterImpl::update_observed_replica_timestamps(
778 const impl::vector_clock::timestamp_vector& received_logical_timestamps)
780 std::shared_ptr<impl::vector_clock> received =
781 to_vector_clock(received_logical_timestamps);
783 std::shared_ptr<impl::vector_clock> currentClock =
784 this->observed_clock_;
785 if (currentClock->is_after(*received)) {
788 if (observed_clock_.compare_and_set(currentClock, received)) {
794std::shared_ptr<impl::vector_clock>
795PNCounterImpl::to_vector_clock(
796 const impl::vector_clock::timestamp_vector& replica_logical_timestamps)
798 return std::shared_ptr<impl::vector_clock>(
799 new impl::vector_clock(replica_logical_timestamps));
802boost::shared_ptr<member>
803PNCounterImpl::get_current_target_replica_address()
805 return current_target_replica_address_.load();
808IListImpl::IListImpl(
const std::string& instance_name,
809 spi::ClientContext* context)
810 : ProxyImpl(
"hz:impl:listService", instance_name, context)
812 serialization::pimpl::data key_data =
813 get_context().get_serialization_service().to_data<std::string>(
815 partition_id_ = get_partition_id(key_data);
819IListImpl::remove_item_listener(boost::uuids::uuid registration_id)
821 return get_context().get_client_listener_service().deregister_listener(
828 auto request = protocol::codec::list_size_encode(get_name());
829 return invoke_and_get_future<int>(request, partition_id_);
835 auto request = protocol::codec::list_isempty_encode(get_name());
836 return invoke_and_get_future<bool>(request, partition_id_);
840IListImpl::contains(
const serialization::pimpl::data& element)
842 auto request = protocol::codec::list_contains_encode(get_name(), element);
843 return invoke_and_get_future<bool>(request, partition_id_);
846boost::future<std::vector<serialization::pimpl::data>>
847IListImpl::to_array_data()
849 auto request = protocol::codec::list_getall_encode(get_name());
850 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
851 request, partition_id_);
855IListImpl::add(
const serialization::pimpl::data& element)
857 auto request = protocol::codec::list_add_encode(get_name(), element);
858 return invoke_and_get_future<bool>(request, partition_id_);
862IListImpl::remove(
const serialization::pimpl::data& element)
864 auto request = protocol::codec::list_remove_encode(get_name(), element);
865 return invoke_and_get_future<bool>(request, partition_id_);
869IListImpl::contains_all_data(
870 const std::vector<serialization::pimpl::data>& elements)
873 protocol::codec::list_containsall_encode(get_name(), elements);
874 return invoke_and_get_future<bool>(request, partition_id_);
878IListImpl::add_all_data(
const std::vector<serialization::pimpl::data>& elements)
880 auto request = protocol::codec::list_addall_encode(get_name(), elements);
881 return invoke_and_get_future<bool>(request, partition_id_);
885IListImpl::add_all_data(
int index,
886 const std::vector<serialization::pimpl::data>& elements)
889 protocol::codec::list_addallwithindex_encode(get_name(), index, elements);
890 return invoke_and_get_future<bool>(request, partition_id_);
894IListImpl::remove_all_data(
895 const std::vector<serialization::pimpl::data>& elements)
898 protocol::codec::list_compareandremoveall_encode(get_name(), elements);
899 return invoke_and_get_future<bool>(request, partition_id_);
903IListImpl::retain_all_data(
904 const std::vector<serialization::pimpl::data>& elements)
907 protocol::codec::list_compareandretainall_encode(get_name(), elements);
908 return invoke_and_get_future<bool>(request, partition_id_);
914 auto request = protocol::codec::list_clear_encode(get_name());
915 return to_void_future(invoke_on_partition(request, partition_id_));
918boost::future<boost::optional<serialization::pimpl::data>>
919IListImpl::get_data(
int index)
921 auto request = protocol::codec::list_get_encode(get_name(), index);
922 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
923 request, partition_id_);
926boost::future<boost::optional<serialization::pimpl::data>>
927IListImpl::set_data(
int index,
const serialization::pimpl::data& element)
929 auto request = protocol::codec::list_set_encode(get_name(), index, element);
930 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
931 request, partition_id_);
935IListImpl::add(
int index,
const serialization::pimpl::data& element)
938 protocol::codec::list_addwithindex_encode(get_name(), index, element);
939 return to_void_future(invoke_on_partition(request, partition_id_));
942boost::future<boost::optional<serialization::pimpl::data>>
943IListImpl::remove_data(
int index)
946 protocol::codec::list_removewithindex_encode(get_name(), index);
947 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
948 request, partition_id_);
952IListImpl::index_of(
const serialization::pimpl::data& element)
954 auto request = protocol::codec::list_indexof_encode(get_name(), element);
955 return invoke_and_get_future<int>(request, partition_id_);
959IListImpl::last_index_of(
const serialization::pimpl::data& element)
962 protocol::codec::list_lastindexof_encode(get_name(), element);
963 return invoke_and_get_future<int>(request, partition_id_);
966boost::future<std::vector<serialization::pimpl::data>>
967IListImpl::sub_list_data(
int from_index,
int to_index)
970 protocol::codec::list_sub_encode(get_name(), from_index, to_index);
971 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
972 request, partition_id_);
975std::shared_ptr<spi::impl::ListenerMessageCodec>
976IListImpl::create_item_listener_codec(
bool include_value)
978 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
979 new ListListenerMessageCodec(get_name(), include_value));
982IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(
985 : name_(std::move(name))
986 , include_value_(include_value)
989protocol::ClientMessage
990IListImpl::ListListenerMessageCodec::encode_add_request(
bool local_only)
const
992 return protocol::codec::list_addlistener_encode(
993 name_, include_value_, local_only);
996protocol::ClientMessage
997IListImpl::ListListenerMessageCodec::encode_remove_request(
998 boost::uuids::uuid real_registration_id)
const
1000 return protocol::codec::list_removelistener_encode(name_,
1001 real_registration_id);
1004flake_id_generator_impl::Block::Block(IdBatch&& id_batch,
1005 std::chrono::milliseconds validity)
1006 : id_batch_(id_batch)
1007 , invalid_since_(std::chrono::steady_clock::now() + validity)
1012flake_id_generator_impl::Block::next()
1014 if (invalid_since_ <= std::chrono::steady_clock::now()) {
1019 index = num_returned_;
1020 if (index == id_batch_.get_batch_size()) {
1023 }
while (!num_returned_.compare_exchange_strong(index, index + 1));
1025 return id_batch_.get_base() + index * id_batch_.get_increment();
1028flake_id_generator_impl::IdBatch::IdIterator
1029 flake_id_generator_impl::IdBatch::endOfBatch;
1032flake_id_generator_impl::IdBatch::get_base()
const
1038flake_id_generator_impl::IdBatch::get_increment()
const
1044flake_id_generator_impl::IdBatch::get_batch_size()
const
1049flake_id_generator_impl::IdBatch::IdBatch(int64_t base,
1053 , increment_(increment)
1054 , batch_size_(batch_size)
1057flake_id_generator_impl::IdBatch::IdIterator&
1058flake_id_generator_impl::IdBatch::end()
1063flake_id_generator_impl::IdBatch::IdIterator
1064flake_id_generator_impl::IdBatch::iterator()
1066 return flake_id_generator_impl::IdBatch::IdIterator(
1067 base_, increment_, batch_size_);
1070flake_id_generator_impl::IdBatch::IdIterator::IdIterator(
1072 const int64_t increment,
1075 , increment_(increment)
1076 , remaining_(remaining)
1080flake_id_generator_impl::IdBatch::IdIterator::operator==(
1081 const flake_id_generator_impl::IdBatch::IdIterator& rhs)
const
1083 return base2_ == rhs.base2_ && increment_ == rhs.increment_ &&
1084 remaining_ == rhs.remaining_;
1088flake_id_generator_impl::IdBatch::IdIterator::operator!=(
1089 const flake_id_generator_impl::IdBatch::IdIterator& rhs)
const
1091 return !(rhs == *
this);
1094flake_id_generator_impl::IdBatch::IdIterator::operator*()
const
1099flake_id_generator_impl::IdBatch::IdIterator::IdIterator()
1105flake_id_generator_impl::IdBatch::IdIterator&
1106flake_id_generator_impl::IdBatch::IdIterator::operator++()
1108 if (remaining_ == 0) {
1109 return flake_id_generator_impl::IdBatch::end();
1114 base2_ += increment_;
1119flake_id_generator_impl::flake_id_generator_impl(
1120 const std::string& service_name,
1121 const std::string& object_name,
1122 spi::ClientContext* context)
1123 : ProxyImpl(service_name, object_name, context)
1127 context->get_client_config().find_flake_id_generator_config(object_name);
1128 batch_size_ = config->get_prefetch_count();
1129 validity_ = config->get_prefetch_validity_duration();
1133flake_id_generator_impl::new_id_internal()
1135 auto b = block_.load();
1137 int64_t res = b->next();
1138 if (res != INT64_MIN) {
1143 throw std::overflow_error(
"");
1146boost::future<int64_t>
1147flake_id_generator_impl::new_id()
1150 return boost::make_ready_future(new_id_internal());
1151 }
catch (std::overflow_error&) {
1152 return new_id_batch(batch_size_)
1153 .then(boost::launch::sync,
1154 [=](boost::future<flake_id_generator_impl::IdBatch> f) {
1156 boost::make_shared<Block>(f.get(), validity_);
1157 auto value = newBlock->next();
1158 auto b = block_.load();
1159 block_.compare_exchange_strong(b, newBlock);
1165boost::future<flake_id_generator_impl::IdBatch>
1166flake_id_generator_impl::new_id_batch(int32_t size)
1169 protocol::codec::flakeidgenerator_newidbatch_encode(get_name(), size);
1170 return invoke(request).then(
1171 boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
1173 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
1175 auto base = msg.get<int64_t>();
1176 auto increment = msg.get<int64_t>();
1177 auto batch_size = msg.get<int32_t>();
1178 return flake_id_generator_impl::IdBatch(base, increment, batch_size);
1182IQueueImpl::IQueueImpl(
const std::string& instance_name,
1183 spi::ClientContext* context)
1184 : ProxyImpl(
"hz:impl:queueService", instance_name, context)
1186 serialization::pimpl::data data =
1187 get_context().get_serialization_service().to_data<std::string>(
1189 partition_id_ = get_partition_id(data);
1193IQueueImpl::remove_item_listener(boost::uuids::uuid registration_id)
1195 return get_context().get_client_listener_service().deregister_listener(
1200IQueueImpl::offer(
const serialization::pimpl::data& element,
1201 std::chrono::milliseconds timeout)
1203 auto request = protocol::codec::queue_offer_encode(
1206 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1207 return invoke_and_get_future<bool>(request, partition_id_);
1211IQueueImpl::put(
const serialization::pimpl::data& element)
1213 auto request = protocol::codec::queue_put_encode(get_name(), element);
1214 return to_void_future(invoke_on_partition(request, partition_id_));
1217boost::future<boost::optional<serialization::pimpl::data>>
1218IQueueImpl::poll_data(std::chrono::milliseconds timeout)
1220 auto request = protocol::codec::queue_poll_encode(
1222 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1223 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1224 request, partition_id_);
1228IQueueImpl::remaining_capacity()
1230 auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
1231 return invoke_and_get_future<int>(request, partition_id_);
1235IQueueImpl::remove(
const serialization::pimpl::data& element)
1237 auto request = protocol::codec::queue_remove_encode(get_name(), element);
1238 return invoke_and_get_future<bool>(request, partition_id_);
1242IQueueImpl::contains(
const serialization::pimpl::data& element)
1244 auto request = protocol::codec::queue_contains_encode(get_name(), element);
1245 return invoke_and_get_future<bool>(request, partition_id_);
1248boost::future<std::vector<serialization::pimpl::data>>
1249IQueueImpl::drain_to_data(
size_t max_elements)
1251 auto request = protocol::codec::queue_draintomaxsize_encode(
1252 get_name(), (int32_t)max_elements);
1254 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1255 request, partition_id_);
1258boost::future<std::vector<serialization::pimpl::data>>
1259IQueueImpl::drain_to_data()
1261 auto request = protocol::codec::queue_drainto_encode(get_name());
1262 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1263 request, partition_id_);
1266boost::future<boost::optional<serialization::pimpl::data>>
1267IQueueImpl::take_data()
1269 auto request = protocol::codec::queue_take_encode(get_name());
1270 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1271 request, partition_id_);
1274boost::future<boost::optional<serialization::pimpl::data>>
1275IQueueImpl::peek_data()
1277 auto request = protocol::codec::queue_peek_encode(get_name());
1278 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1279 request, partition_id_);
1285 auto request = protocol::codec::queue_size_encode(get_name());
1286 return invoke_and_get_future<int>(request, partition_id_);
1290IQueueImpl::is_empty()
1292 auto request = protocol::codec::queue_isempty_encode(get_name());
1293 return invoke_and_get_future<bool>(request, partition_id_);
1296boost::future<std::vector<serialization::pimpl::data>>
1297IQueueImpl::to_array_data()
1299 auto request = protocol::codec::queue_iterator_encode(get_name());
1300 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1301 request, partition_id_);
1305IQueueImpl::contains_all_data(
1306 const std::vector<serialization::pimpl::data>& elements)
1309 protocol::codec::queue_containsall_encode(get_name(), elements);
1310 return invoke_and_get_future<bool>(request, partition_id_);
1314IQueueImpl::add_all_data(
1315 const std::vector<serialization::pimpl::data>& elements)
1317 auto request = protocol::codec::queue_addall_encode(get_name(), elements);
1318 return invoke_and_get_future<bool>(request, partition_id_);
1322IQueueImpl::remove_all_data(
1323 const std::vector<serialization::pimpl::data>& elements)
1326 protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
1327 return invoke_and_get_future<bool>(request, partition_id_);
1331IQueueImpl::retain_all_data(
1332 const std::vector<serialization::pimpl::data>& elements)
1335 protocol::codec::queue_compareandretainall_encode(get_name(), elements);
1336 return invoke_and_get_future<bool>(request, partition_id_);
1342 auto request = protocol::codec::queue_clear_encode(get_name());
1343 return to_void_future(invoke_on_partition(request, partition_id_));
1346std::shared_ptr<spi::impl::ListenerMessageCodec>
1347IQueueImpl::create_item_listener_codec(
bool include_value)
1349 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1350 new QueueListenerMessageCodec(get_name(), include_value));
1353IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(
1356 : name_(std::move(name))
1357 , include_value_(include_value)
1360protocol::ClientMessage
1361IQueueImpl::QueueListenerMessageCodec::encode_add_request(
bool local_only)
const
1363 return protocol::codec::queue_addlistener_encode(
1364 name_, include_value_, local_only);
1367protocol::ClientMessage
1368IQueueImpl::QueueListenerMessageCodec::encode_remove_request(
1369 boost::uuids::uuid real_registration_id)
const
1371 return protocol::codec::queue_removelistener_encode(name_,
1372 real_registration_id);
1375ProxyImpl::ProxyImpl(
const std::string& service_name,
1376 const std::string& object_name,
1377 spi::ClientContext* context)
1378 : ClientProxy(object_name, service_name, *context)
1379 , SerializingProxy(*context, object_name)
1382ProxyImpl::~ProxyImpl() =
default;
1384SerializingProxy::SerializingProxy(spi::ClientContext& context,
1385 const std::string& object_name)
1386 : serialization_service_(context.get_serialization_service())
1387 , partition_service_(context.get_partition_service())
1388 , object_name_(object_name)
1389 , client_context_(context)
1393SerializingProxy::get_partition_id(
const serialization::pimpl::data& key)
1395 return partition_service_.get_partition_id(key);
1398boost::future<protocol::ClientMessage>
1399SerializingProxy::invoke_on_partition(protocol::ClientMessage& request,
1403 return spi::impl::ClientInvocation::create(
1405 std::make_shared<protocol::ClientMessage>(std::move(request)),
1409 }
catch (exception::iexception&) {
1410 util::exception_util::rethrow(std::current_exception());
1411 return boost::make_ready_future(protocol::ClientMessage(0));
1415boost::future<protocol::ClientMessage>
1416SerializingProxy::invoke(protocol::ClientMessage& request)
1419 return spi::impl::ClientInvocation::create(
1421 std::make_shared<protocol::ClientMessage>(std::move(request)),
1424 }
catch (exception::iexception&) {
1425 util::exception_util::rethrow(std::current_exception());
1426 return boost::make_ready_future(protocol::ClientMessage(0));
1430boost::future<protocol::ClientMessage>
1431SerializingProxy::invoke_on_connection(
1432 protocol::ClientMessage& request,
1433 std::shared_ptr<connection::Connection> connection)
1436 return spi::impl::ClientInvocation::create(
1438 std::make_shared<protocol::ClientMessage>(std::move(request)),
1442 }
catch (exception::iexception&) {
1443 util::exception_util::rethrow(std::current_exception());
1444 return boost::make_ready_future(protocol::ClientMessage(0));
1448boost::future<protocol::ClientMessage>
1449SerializingProxy::invoke_on_key_owner(
1450 protocol::ClientMessage& request,
1451 const serialization::pimpl::data& key_data)
1454 return invoke_on_partition(request, get_partition_id(key_data));
1455 }
catch (exception::iexception&) {
1456 util::exception_util::rethrow(std::current_exception());
1457 return boost::make_ready_future(protocol::ClientMessage(0));
1461boost::future<protocol::ClientMessage>
1462SerializingProxy::invoke_on_member(protocol::ClientMessage& request,
1463 boost::uuids::uuid uuid)
1466 auto invocation = spi::impl::ClientInvocation::create(
1468 std::make_shared<protocol::ClientMessage>(std::move(request)),
1471 return invocation->invoke();
1472 }
catch (exception::iexception&) {
1473 util::exception_util::rethrow(std::current_exception());
1474 return boost::make_ready_future(protocol::ClientMessage(0));
1479boost::future<boost::optional<serialization::pimpl::data>>
1480SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request)
1482 return decode_optional_var_sized<serialization::pimpl::data>(
1487boost::future<boost::optional<map::data_entry_view>>
1488SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1489 const serialization::pimpl::data& key)
1491 return decode_optional_var_sized<map::data_entry_view>(
1492 invoke_on_key_owner(request, key));
1496boost::future<boost::optional<serialization::pimpl::data>>
1497SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1500 return decode_optional_var_sized<serialization::pimpl::data>(
1501 invoke_on_partition(request, partition_id));
1505boost::future<boost::optional<serialization::pimpl::data>>
1506SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1507 const serialization::pimpl::data& key)
1509 return decode_optional_var_sized<serialization::pimpl::data>(
1510 invoke_on_key_owner(request, key));
1513PartitionSpecificClientProxy::PartitionSpecificClientProxy(
1514 const std::string& service_name,
1515 const std::string& object_name,
1516 spi::ClientContext* context)
1517 : ProxyImpl(service_name, object_name, context)
1522PartitionSpecificClientProxy::on_initialize()
1524 std::string partitionKey = internal::partition::strategy::
1525 StringPartitioningStrategy::get_partition_key(name_);
1526 partition_id_ = get_context().get_partition_service().get_partition_id(
1527 to_data<std::string>(partitionKey));
1530IMapImpl::IMapImpl(
const std::string& instance_name,
1531 spi::ClientContext* context)
1532 : ProxyImpl(
"hz:impl:mapService", instance_name, context)
1536IMapImpl::contains_key(
const serialization::pimpl::data& key)
1538 auto request = protocol::codec::map_containskey_encode(
1539 get_name(), key, util::get_current_thread_id());
1540 return invoke_and_get_future<bool>(request, key);
1544IMapImpl::contains_value(
const serialization::pimpl::data& value)
1546 auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1547 return invoke_and_get_future<bool>(request);
1550boost::future<boost::optional<serialization::pimpl::data>>
1551IMapImpl::get_data(
const serialization::pimpl::data& key)
1553 auto request = protocol::codec::map_get_encode(
1554 get_name(), key, util::get_current_thread_id());
1555 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1559boost::future<boost::optional<serialization::pimpl::data>>
1560IMapImpl::remove_data(
const serialization::pimpl::data& key)
1562 auto request = protocol::codec::map_remove_encode(
1563 get_name(), key, util::get_current_thread_id());
1564 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1569IMapImpl::remove(
const serialization::pimpl::data& key,
1570 const serialization::pimpl::data& value)
1572 auto request = protocol::codec::map_removeifsame_encode(
1573 get_name(), key, value, util::get_current_thread_id());
1574 return invoke_and_get_future<bool>(request, key);
1577boost::future<protocol::ClientMessage>
1578IMapImpl::remove_all(
const serialization::pimpl::data& predicate_data)
1581 protocol::codec::map_removeall_encode(get_name(), predicate_data);
1582 return invoke(request);
1585boost::future<protocol::ClientMessage>
1586IMapImpl::delete_entry(
const serialization::pimpl::data& key)
1588 auto request = protocol::codec::map_delete_encode(
1589 get_name(), key, util::get_current_thread_id());
1590 return invoke_on_partition(request, get_partition_id(key));
1593boost::future<protocol::ClientMessage>
1596 auto request = protocol::codec::map_flush_encode(get_name());
1597 return invoke(request);
1601IMapImpl::try_remove(
const serialization::pimpl::data& key,
1602 std::chrono::milliseconds timeout)
1604 auto request = protocol::codec::map_tryremove_encode(
1607 util::get_current_thread_id(),
1608 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1610 return invoke_and_get_future<bool>(request, key);
1614IMapImpl::try_put(
const serialization::pimpl::data& key,
1615 const serialization::pimpl::data& value,
1616 std::chrono::milliseconds timeout)
1618 auto request = protocol::codec::map_tryput_encode(
1622 util::get_current_thread_id(),
1623 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1625 return invoke_and_get_future<bool>(request, key);
1628boost::future<boost::optional<serialization::pimpl::data>>
1629IMapImpl::put_data(
const serialization::pimpl::data& key,
1630 const serialization::pimpl::data& value,
1631 std::chrono::milliseconds ttl)
1633 auto request = protocol::codec::map_put_encode(
1637 util::get_current_thread_id(),
1638 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1639 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1643boost::future<protocol::ClientMessage>
1644IMapImpl::put_transient(
const serialization::pimpl::data& key,
1645 const serialization::pimpl::data& value,
1646 std::chrono::milliseconds ttl)
1648 auto request = protocol::codec::map_puttransient_encode(
1652 util::get_current_thread_id(),
1653 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1654 return invoke_on_partition(request, get_partition_id(key));
1657boost::future<boost::optional<serialization::pimpl::data>>
1658IMapImpl::put_if_absent_data(
const serialization::pimpl::data& key,
1659 const serialization::pimpl::data& value,
1660 std::chrono::milliseconds ttl)
1662 auto request = protocol::codec::map_putifabsent_encode(
1666 util::get_current_thread_id(),
1667 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1668 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1673IMapImpl::replace(
const serialization::pimpl::data& key,
1674 const serialization::pimpl::data& old_value,
1675 const serialization::pimpl::data& new_value)
1677 auto request = protocol::codec::map_replaceifsame_encode(
1678 get_name(), key, old_value, new_value, util::get_current_thread_id());
1680 return invoke_and_get_future<bool>(request, key);
1683boost::future<boost::optional<serialization::pimpl::data>>
1684IMapImpl::replace_data(
const serialization::pimpl::data& key,
1685 const serialization::pimpl::data& value)
1687 auto request = protocol::codec::map_replace_encode(
1688 get_name(), key, value, util::get_current_thread_id());
1690 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1694boost::future<protocol::ClientMessage>
1695IMapImpl::set(
const serialization::pimpl::data& key,
1696 const serialization::pimpl::data& value,
1697 std::chrono::milliseconds ttl)
1699 auto request = protocol::codec::map_set_encode(
1703 util::get_current_thread_id(),
1704 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1705 return invoke_on_partition(request, get_partition_id(key));
1708boost::future<protocol::ClientMessage>
1709IMapImpl::lock(
const serialization::pimpl::data& key)
1711 return lock(key, std::chrono::milliseconds(-1));
1714boost::future<protocol::ClientMessage>
1715IMapImpl::lock(
const serialization::pimpl::data& key,
1716 std::chrono::milliseconds lease_time)
1718 auto request = protocol::codec::map_lock_encode(
1721 util::get_current_thread_id(),
1722 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1723 lock_reference_id_generator_->get_next_reference_id());
1724 return invoke_on_partition(request, get_partition_id(key));
1728IMapImpl::is_locked(
const serialization::pimpl::data& key)
1730 auto request = protocol::codec::map_islocked_encode(get_name(), key);
1732 return invoke_and_get_future<bool>(request, key);
1736IMapImpl::try_lock(
const serialization::pimpl::data& key,
1737 std::chrono::milliseconds timeout)
1739 return try_lock(key, timeout, std::chrono::milliseconds(-1));
1743IMapImpl::try_lock(
const serialization::pimpl::data& key,
1744 std::chrono::milliseconds timeout,
1745 std::chrono::milliseconds lease_time)
1747 auto request = protocol::codec::map_trylock_encode(
1750 util::get_current_thread_id(),
1751 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1752 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1753 lock_reference_id_generator_->get_next_reference_id());
1754 return invoke_and_get_future<bool>(request, key);
1757boost::future<protocol::ClientMessage>
1758IMapImpl::unlock(
const serialization::pimpl::data& key)
1760 auto request = protocol::codec::map_unlock_encode(
1763 util::get_current_thread_id(),
1764 lock_reference_id_generator_->get_next_reference_id());
1765 return invoke_on_partition(request, get_partition_id(key));
1768boost::future<protocol::ClientMessage>
1769IMapImpl::force_unlock(
const serialization::pimpl::data& key)
1771 auto request = protocol::codec::map_forceunlock_encode(
1772 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
1773 return invoke_on_partition(request, get_partition_id(key));
1776boost::future<std::string>
1777IMapImpl::add_interceptor(
const serialization::pimpl::data& interceptor)
1780 protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1781 return invoke_and_get_future<std::string>(request);
1786boost::future<boost::uuids::uuid>
1787IMapImpl::add_entry_listener(
1788 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1790 int32_t listener_flags)
1792 return register_listener(
1793 create_map_entry_listener_codec(include_value, listener_flags),
1794 std::move(entry_event_handler));
1797boost::future<boost::uuids::uuid>
1798IMapImpl::add_entry_listener(
1799 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1800 serialization::pimpl::data&& predicate,
1802 int32_t listener_flags)
1804 return register_listener(
1805 create_map_entry_listener_codec(
1806 include_value, std::move(predicate), listener_flags),
1807 std::move(entry_event_handler));
1811IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
1813 return get_context().get_client_listener_service().deregister_listener(
1817boost::future<boost::uuids::uuid>
1818IMapImpl::add_entry_listener(
1819 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1821 serialization::pimpl::data&& key,
1822 int32_t listener_flags)
1824 return register_listener(create_map_entry_listener_codec(
1825 include_value, listener_flags, std::move(key)),
1826 std::move(entry_event_handler));
1829boost::future<boost::optional<map::data_entry_view>>
1830IMapImpl::get_entry_view_data(
const serialization::pimpl::data& key)
1832 auto request = protocol::codec::map_getentryview_encode(
1833 get_name(), key, util::get_current_thread_id());
1834 return invoke_and_get_future<boost::optional<map::data_entry_view>>(request,
1839IMapImpl::evict(
const serialization::pimpl::data& key)
1841 auto request = protocol::codec::map_evict_encode(
1842 get_name(), key, util::get_current_thread_id());
1843 return invoke_and_get_future<bool>(request, key);
1846boost::future<protocol::ClientMessage>
1847IMapImpl::evict_all()
1849 auto request = protocol::codec::map_evictall_encode(get_name());
1850 return invoke(request);
1853boost::future<EntryVector>
1854IMapImpl::get_all_data(
int partition_id,
1855 const std::vector<serialization::pimpl::data>& keys)
1857 auto request = protocol::codec::map_getall_encode(get_name(), keys);
1858 return invoke_and_get_future<EntryVector>(request, partition_id);
1861boost::future<std::vector<serialization::pimpl::data>>
1862IMapImpl::key_set_data()
1864 auto request = protocol::codec::map_keyset_encode(get_name());
1865 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1869boost::future<std::vector<serialization::pimpl::data>>
1870IMapImpl::key_set_data(
const serialization::pimpl::data& predicate)
1873 protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1874 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1879 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1880IMapImpl::key_set_for_paging_predicate_data(
1881 protocol::codec::holder::paging_predicate_holder
const& predicate)
1883 auto request = protocol::codec::map_keysetwithpagingpredicate_encode(
1884 get_name(), predicate);
1885 return invoke(request).then(
1886 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1887 return get_paging_predicate_response<
1888 std::vector<serialization::pimpl::data>>(std::move(f));
1892boost::future<EntryVector>
1893IMapImpl::entry_set_data()
1895 auto request = protocol::codec::map_entryset_encode(get_name());
1896 return invoke_and_get_future<EntryVector>(request);
1899boost::future<EntryVector>
1900IMapImpl::entry_set_data(
const serialization::pimpl::data& predicate)
1903 protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1904 return invoke_and_get_future<EntryVector>(request);
1907boost::future<std::pair<EntryVector, query::anchor_data_list>>
1908IMapImpl::entry_set_for_paging_predicate_data(
1909 protocol::codec::holder::paging_predicate_holder
const& predicate)
1911 auto request = protocol::codec::map_entrieswithpagingpredicate_encode(
1912 get_name(), predicate);
1913 return invoke(request).then(
1914 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1915 return get_paging_predicate_response<EntryVector>(std::move(f));
1919boost::future<std::vector<serialization::pimpl::data>>
1920IMapImpl::values_data()
1922 auto request = protocol::codec::map_values_encode(get_name());
1923 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1927boost::future<std::vector<serialization::pimpl::data>>
1928IMapImpl::values_data(
const serialization::pimpl::data& predicate)
1931 protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1932 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1937 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1938IMapImpl::values_for_paging_predicate_data(
1939 protocol::codec::holder::paging_predicate_holder
const& predicate)
1941 auto request = protocol::codec::map_valueswithpagingpredicate_encode(
1942 get_name(), predicate);
1943 return invoke(request).then(
1944 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1945 return get_paging_predicate_response<
1946 std::vector<serialization::pimpl::data>>(std::move(f));
1950boost::future<protocol::ClientMessage>
1951IMapImpl::add_index_data(
const config::index_config& config)
1953 auto request = protocol::codec::map_addindex_encode(get_name(), config);
1954 return invoke(request);
1960 auto request = protocol::codec::map_size_encode(get_name());
1961 return invoke_and_get_future<int>(request);
1967 auto request = protocol::codec::map_isempty_encode(get_name());
1968 return invoke_and_get_future<bool>(request);
1971boost::future<protocol::ClientMessage>
1972IMapImpl::put_all_data(
int partition_id,
const EntryVector& entries)
1975 protocol::codec::map_putall_encode(get_name(), entries,
true);
1976 return invoke_on_partition(request, partition_id);
1979boost::future<protocol::ClientMessage>
1980IMapImpl::clear_data()
1982 auto request = protocol::codec::map_clear_encode(get_name());
1983 return invoke(request);
1986boost::future<boost::optional<serialization::pimpl::data>>
1987IMapImpl::execute_on_key_data(
const serialization::pimpl::data& key,
1988 const serialization::pimpl::data& processor)
1990 auto request = protocol::codec::map_executeonkey_encode(
1991 get_name(), processor, key, util::get_current_thread_id());
1992 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1993 request, get_partition_id(key));
1996boost::future<boost::optional<serialization::pimpl::data>>
1997IMapImpl::submit_to_key_data(
const serialization::pimpl::data& key,
1998 const serialization::pimpl::data& processor)
2000 auto request = protocol::codec::map_submittokey_encode(
2001 get_name(), processor, key, util::get_current_thread_id());
2002 return invoke_on_partition(request, get_partition_id(key))
2003 .then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
2006 return msg.get_nullable<serialization::pimpl::data>();
2010boost::future<EntryVector>
2011IMapImpl::execute_on_keys_data(
2012 const std::vector<serialization::pimpl::data>& keys,
2013 const serialization::pimpl::data& processor)
2016 protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
2017 return invoke_and_get_future<EntryVector>(request);
2020boost::future<protocol::ClientMessage>
2021IMapImpl::remove_interceptor(
const std::string&
id)
2024 protocol::codec::map_removeinterceptor_encode(get_name(),
id);
2025 return invoke(request);
2028boost::future<EntryVector>
2029IMapImpl::execute_on_entries_data(
2030 const serialization::pimpl::data& entry_processor)
2033 protocol::codec::map_executeonallkeys_encode(get_name(), entry_processor);
2034 return invoke_and_get_future<EntryVector>(request);
2037boost::future<EntryVector>
2038IMapImpl::execute_on_entries_data(
2039 const serialization::pimpl::data& entry_processor,
2040 const serialization::pimpl::data& predicate)
2042 auto request = protocol::codec::map_executewithpredicate_encode(
2043 get_name(), entry_processor, predicate);
2044 return invoke_and_get_future<EntryVector>(request);
2047std::shared_ptr<spi::impl::ListenerMessageCodec>
2048IMapImpl::create_map_entry_listener_codec(
2050 serialization::pimpl::data&& predicate,
2051 int32_t listener_flags)
2053 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2054 new MapEntryListenerWithPredicateMessageCodec(
2055 get_name(), include_value, listener_flags, std::move(predicate)));
2058std::shared_ptr<spi::impl::ListenerMessageCodec>
2059IMapImpl::create_map_entry_listener_codec(
bool include_value,
2060 int32_t listener_flags)
2062 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2063 new MapEntryListenerMessageCodec(
2064 get_name(), include_value, listener_flags));
2067std::shared_ptr<spi::impl::ListenerMessageCodec>
2068IMapImpl::create_map_entry_listener_codec(
bool include_value,
2069 int32_t listener_flags,
2070 serialization::pimpl::data&& key)
2072 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2073 new MapEntryListenerToKeyCodec(
2074 get_name(), include_value, listener_flags, std::move(key)));
2078IMapImpl::on_initialize()
2080 ProxyImpl::on_initialize();
2081 lock_reference_id_generator_ =
2082 get_context().get_lock_reference_id_generator();
2085IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(
2088 int32_t listener_flags)
2089 : name_(std::move(name))
2090 , include_value_(include_value)
2091 , listener_flags_(listener_flags)
2094protocol::ClientMessage
2095IMapImpl::MapEntryListenerMessageCodec::encode_add_request(
2096 bool local_only)
const
2098 return protocol::codec::map_addentrylistener_encode(
2099 name_, include_value_,
static_cast<int32_t
>(listener_flags_), local_only);
2102protocol::ClientMessage
2103IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(
2104 boost::uuids::uuid real_registration_id)
const
2106 return protocol::codec::map_removeentrylistener_encode(
2107 name_, real_registration_id);
2110protocol::ClientMessage
2111IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(
bool local_only)
const
2113 return protocol::codec::map_addentrylistenertokey_encode(
2117 static_cast<int32_t
>(listener_flags_),
2121protocol::ClientMessage
2122IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(
2123 boost::uuids::uuid real_registration_id)
const
2125 return protocol::codec::map_removeentrylistener_encode(
2126 name_, real_registration_id);
2129IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(
2132 int32_t listener_flags,
2133 serialization::pimpl::data key)
2134 : name_(std::move(name))
2135 , include_value_(include_value)
2136 , listener_flags_(listener_flags)
2137 , key_(std::move(key))
2140IMapImpl::MapEntryListenerWithPredicateMessageCodec::
2141 MapEntryListenerWithPredicateMessageCodec(
2144 int32_t listener_flags,
2145 serialization::pimpl::data&& predicate)
2146 : name_(std::move(name))
2147 , include_value_(include_value)
2148 , listener_flags_(listener_flags)
2149 , predicate_(std::move(predicate))
2152protocol::ClientMessage
2153IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(
2154 bool local_only)
const
2156 return protocol::codec::map_addentrylistenerwithpredicate_encode(
2160 static_cast<int32_t
>(listener_flags_),
2164protocol::ClientMessage
2165IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
2166 boost::uuids::uuid real_registration_id)
const
2168 return protocol::codec::map_removeentrylistener_encode(
2169 name_, real_registration_id);
2172TransactionalQueueImpl::TransactionalQueueImpl(
2173 const std::string& name,
2174 txn::TransactionProxy& transaction_proxy)
2175 : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy)
2179TransactionalQueueImpl::offer(
const serialization::pimpl::data& e,
2180 std::chrono::milliseconds timeout)
2182 auto request = protocol::codec::transactionalqueue_offer_encode(
2184 get_transaction_id(),
2185 util::get_current_thread_id(),
2187 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2189 return invoke_and_get_future<bool>(request);
2192boost::future<boost::optional<serialization::pimpl::data>>
2193TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout)
2195 auto request = protocol::codec::transactionalqueue_poll_encode(
2197 get_transaction_id(),
2198 util::get_current_thread_id(),
2199 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2201 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
2206TransactionalQueueImpl::size()
2208 auto request = protocol::codec::transactionalqueue_size_encode(
2209 get_name(), get_transaction_id(), util::get_current_thread_id());
2211 return invoke_and_get_future<int>(request);
2214ISetImpl::ISetImpl(
const std::string& instance_name,
2215 spi::ClientContext* client_context)
2216 : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context)
2218 serialization::pimpl::data key_data =
2219 get_context().get_serialization_service().to_data<std::string>(
2221 partition_id_ = get_partition_id(key_data);
2225ISetImpl::remove_item_listener(boost::uuids::uuid registration_id)
2227 return get_context().get_client_listener_service().deregister_listener(
2234 auto request = protocol::codec::set_size_encode(get_name());
2235 return invoke_and_get_future<int>(request, partition_id_);
2241 auto request = protocol::codec::set_isempty_encode(get_name());
2242 return invoke_and_get_future<bool>(request, partition_id_);
2246ISetImpl::contains(
const serialization::pimpl::data& element)
2248 auto request = protocol::codec::set_contains_encode(get_name(), element);
2249 return invoke_and_get_future<bool>(request, partition_id_);
2252boost::future<std::vector<serialization::pimpl::data>>
2253ISetImpl::to_array_data()
2255 auto request = protocol::codec::set_getall_encode(get_name());
2256 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
2257 request, partition_id_);
2261ISetImpl::add(
const serialization::pimpl::data& element)
2263 auto request = protocol::codec::set_add_encode(get_name(), element);
2264 return invoke_and_get_future<bool>(request, partition_id_);
2268ISetImpl::remove(
const serialization::pimpl::data& element)
2270 auto request = protocol::codec::set_remove_encode(get_name(), element);
2271 return invoke_and_get_future<bool>(request, partition_id_);
2275ISetImpl::contains_all(
const std::vector<serialization::pimpl::data>& elements)
2278 protocol::codec::set_containsall_encode(get_name(), elements);
2279 return invoke_and_get_future<bool>(request, partition_id_);
2283ISetImpl::add_all(
const std::vector<serialization::pimpl::data>& elements)
2285 auto request = protocol::codec::set_addall_encode(get_name(), elements);
2286 return invoke_and_get_future<bool>(request, partition_id_);
2290ISetImpl::remove_all(
const std::vector<serialization::pimpl::data>& elements)
2293 protocol::codec::set_compareandremoveall_encode(get_name(), elements);
2294 return invoke_and_get_future<bool>(request, partition_id_);
2298ISetImpl::retain_all(
const std::vector<serialization::pimpl::data>& elements)
2301 protocol::codec::set_compareandretainall_encode(get_name(), elements);
2302 return invoke_and_get_future<bool>(request, partition_id_);
2308 auto request = protocol::codec::set_clear_encode(get_name());
2309 return to_void_future(invoke_on_partition(request, partition_id_));
2312std::shared_ptr<spi::impl::ListenerMessageCodec>
2313ISetImpl::create_item_listener_codec(
bool include_value)
2315 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2316 new SetListenerMessageCodec(get_name(), include_value));
2319ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name,
2321 : name_(std::move(name))
2322 , include_value_(include_value)
2325protocol::ClientMessage
2326ISetImpl::SetListenerMessageCodec::encode_add_request(
bool local_only)
const
2328 return protocol::codec::set_addlistener_encode(
2329 name_, include_value_, local_only);
2332protocol::ClientMessage
2333ISetImpl::SetListenerMessageCodec::encode_remove_request(
2334 boost::uuids::uuid real_registration_id)
const
2336 return protocol::codec::set_removelistener_encode(name_,
2337 real_registration_id);
2340ITopicImpl::ITopicImpl(
const std::string& instance_name,
2341 spi::ClientContext* context)
2342 : proxy::ProxyImpl(
"hz:impl:topicService", instance_name, context)
2343 , partition_id_(get_partition_id(to_data(instance_name)))
2347ITopicImpl::publish(
const serialization::pimpl::data& data)
2349 auto request = protocol::codec::topic_publish_encode(get_name(), data);
2350 return to_void_future(invoke_on_partition(request, partition_id_));
2353boost::future<boost::uuids::uuid>
2354ITopicImpl::add_message_listener(
2355 std::shared_ptr<impl::BaseEventHandler> topic_event_handler)
2357 return register_listener(create_item_listener_codec(),
2358 std::move(topic_event_handler));
2362ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id)
2364 return get_context().get_client_listener_service().deregister_listener(
2368std::shared_ptr<spi::impl::ListenerMessageCodec>
2369ITopicImpl::create_item_listener_codec()
2371 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2372 new TopicListenerMessageCodec(get_name()));
2375ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(
2377 : name_(std::move(name))
2380protocol::ClientMessage
2381ITopicImpl::TopicListenerMessageCodec::encode_add_request(
bool local_only)
const
2383 return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
2386protocol::ClientMessage
2387ITopicImpl::TopicListenerMessageCodec::encode_remove_request(
2388 boost::uuids::uuid real_registration_id)
const
2390 return protocol::codec::topic_removemessagelistener_encode(
2391 name_, real_registration_id);
2394ReplicatedMapImpl::ReplicatedMapImpl(
const std::string& service_name,
2395 const std::string& object_name,
2396 spi::ClientContext* context)
2397 : ProxyImpl(service_name, object_name, context)
2398 , target_partition_id_(-1)
2401const int32_t RingbufferImpl::MAX_BATCH_SIZE{ 1000 };
2405const serialization::pimpl::data&
2406data_entry_view::get_key()
const
2411const serialization::pimpl::data&
2412data_entry_view::get_value()
const
2418data_entry_view::get_cost()
const
2424data_entry_view::get_creation_time()
const
2426 return creation_time_;
2430data_entry_view::get_expiration_time()
const
2432 return expiration_time_;
2436data_entry_view::get_hits()
const
2442data_entry_view::get_last_access_time()
const
2444 return last_access_time_;
2448data_entry_view::get_last_stored_time()
const
2450 return last_stored_time_;
2454data_entry_view::get_last_update_time()
const
2456 return last_update_time_;
2460data_entry_view::get_version()
const
2466data_entry_view::get_ttl()
const
2472data_entry_view::get_max_idle()
const
2477data_entry_view::data_entry_view(serialization::pimpl::data&& key,
2478 serialization::pimpl::data&& value,
2480 int64_t creation_time,
2481 int64_t expiration_time,
2483 int64_t last_access_time,
2484 int64_t last_stored_time,
2485 int64_t last_update_time,
2489 : key_(std::move(key))
2490 , value_(std::move(value))
2492 , creation_time_(creation_time)
2493 , expiration_time_(expiration_time)
2495 , last_access_time_(last_access_time)
2496 , last_stored_time_(last_stored_time)
2497 , last_update_time_(last_update_time)
2500 , max_idle_(max_idle)
2507ReliableTopicMessage::ReliableTopicMessage()
2508 : publish_time_(std::chrono::system_clock::now())
2511ReliableTopicMessage::ReliableTopicMessage(
2512 hazelcast::client::serialization::pimpl::data&& payload_data,
2513 std::unique_ptr<address> address)
2514 : publish_time_(std::chrono::system_clock::now())
2515 , payload_(std::move(payload_data))
2518 publisher_address_ = boost::make_optional(*address);
2522std::chrono::system_clock::time_point
2523ReliableTopicMessage::get_publish_time()
const
2525 return publish_time_;
2528const boost::optional<address>&
2529ReliableTopicMessage::get_publisher_address()
const
2531 return publisher_address_;
2534serialization::pimpl::data&
2535ReliableTopicMessage::get_payload()
2543namespace serialization {
2545hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id()
2551hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id()
2553 return RELIABLE_TOPIC_MESSAGE;
2557hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
2558 const topic::impl::reliable::ReliableTopicMessage&
object,
2559 object_data_output& out)
2561 out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
2562 object.publish_time_.time_since_epoch())
2564 out.write_object(
object.publisher_address_);
2565 out.write(
object.payload_.to_byte_array());
2568topic::impl::reliable::ReliableTopicMessage
2569hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(
2570 object_data_input& in)
2572 topic::impl::reliable::ReliableTopicMessage message;
2573 auto now = std::chrono::system_clock::now();
2574 message.publish_time_ = now +
2575 std::chrono::milliseconds(in.read<int64_t>()) -
2576 now.time_since_epoch();
2577 message.publisher_address_ = in.read_object<address>();
2579 serialization::pimpl::data(in.read<std::vector<byte>>().value());
2584entry_event::entry_event(
const std::string& name,
2589 typed_data&& old_value,
2590 typed_data&& merging_value)
2592 , member_(
std::move(member))
2593 , event_type_(event_type)
2594 , key_(
std::move(key))
2595 , value_(
std::move(value))
2596 , old_value_(
std::move(old_value))
2597 , merging_value_(
std::move(merging_value))
2621 return merging_value_;
2643operator<<(std::ostream& os,
const entry_event& event)
2645 os <<
"name: " <<
event.name_ <<
" member: " <<
event.member_
2646 <<
" eventType: " <<
static_cast<int>(
event.event_type_)
2647 <<
" key: " << event.key_.
get_type()
2648 <<
" value: " <<
event.value_.get_type()
2649 <<
" oldValue: " <<
event.old_value_.get_type()
2650 <<
" mergingValue: " <<
event.merging_value_.get_type();
2655 entry_event::type event_type,
2656 const std::string& name,
2657 int number_of_entries_affected)
2659 , event_type_(event_type)
2661 , number_of_entries_affected_(number_of_entries_affected)
2685 return number_of_entries_affected_;
2689operator<<(std::ostream& os,
const map_event& event)
2691 os <<
"MapEvent{member: " <<
event.member_
2692 <<
" eventType: " <<
static_cast<int>(
event.event_type_)
2693 <<
" name: " << event.name_
2694 <<
" numberOfEntriesAffected: " << event.number_of_entries_affected_;
2698item_event_base::item_event_base(
const std::string& name,
2699 const member& member,
2700 const item_event_type& event_type)
2703 , event_type_(event_type)
2724item_event_base::~item_event_base() =
default;
2726flake_id_generator::flake_id_generator(
const std::string& object_name,
2727 spi::ClientContext* context)
2728 : flake_id_generator_impl(SERVICE_NAME, object_name, context)
const typed_data & get_key() const
Returns the key of the entry event.
const std::string & get_name() const
Returns the name of the map for this event.
const typed_data & get_old_value() const
Returns the old value of the entry event.
type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const typed_data & get_value() const
Returns the value of the entry event.
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
static const std::chrono::milliseconds UNSET
Default TTL value of a record.
item_event_type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const std::string & get_name() const
Returns the name of the collection for this event.
Map events common contract.
const std::string & get_name() const
Returns the name of the map for this event.
entry_event::type get_event_type() const
Return the event type.
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
const member & get_member() const
Returns the member fired this event.
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
bool remove_message_listener(const std::string ®istration_id)
Stops receiving messages for the given message listener.
reliable_listener(bool loss_tolerant, int64_t initial_sequence_id=-1)
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const