17 #include <unordered_set>
20 #include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
21 #include "hazelcast/client/proxy/PNCounterImpl.h"
22 #include "hazelcast/client/spi/ClientContext.h"
23 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
24 #include "hazelcast/client/proxy/flake_id_generator_impl.h"
25 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
26 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
27 #include "hazelcast/client/client_config.h"
28 #include "hazelcast/client/map/data_entry_view.h"
29 #include "hazelcast/client/proxy/RingbufferImpl.h"
30 #include "hazelcast/client/impl/vector_clock.h"
31 #include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
32 #include "hazelcast/util/Util.h"
33 #include "hazelcast/client/topic/reliable_listener.h"
39 reliable_topic::reliable_topic(
const std::string& instance_name,
40 spi::ClientContext* context)
41 : proxy::ProxyImpl(reliable_topic::SERVICE_NAME, instance_name, context)
42 , executor_(context->get_client_execution_service().get_user_executor())
43 , logger_(context->get_logger())
45 auto reliable_config =
46 context->get_client_config().lookup_reliable_topic_config(instance_name);
47 if (reliable_config) {
48 batch_size_ = reliable_config->get_read_batch_size();
50 batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
54 context->get_hazelcast_client_implementation()
55 ->get_distributed_object<ringbuffer>(
56 std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
60 reliable_topic::remove_message_listener(
const std::string& registration_id)
62 int id = util::IOUtil::to_value<int>(registration_id);
63 auto runner = runners_map_.get(
id);
72 reliable_topic::on_shutdown()
75 for (
auto& entry : runners_map_.clear()) {
76 entry.second->cancel();
81 reliable_topic::on_destroy()
84 for (
auto& entry : runners_map_.clear()) {
85 entry.second->cancel();
90 reliable_topic::post_destroy()
93 ringbuffer_.get()->destroy().get();
97 reliable_listener::reliable_listener(
bool loss_tolerant,
98 int64_t initial_sequence_id)
99 : loss_tolerant_(loss_tolerant)
100 , initial_sequence_id_(initial_sequence_id)
105 ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator()
106 : reference_id_counter_(0)
110 ClientLockReferenceIdGenerator::get_next_reference_id()
112 return ++reference_id_counter_;
117 MultiMapImpl::MultiMapImpl(
const std::string& instance_name,
118 spi::ClientContext* context)
119 : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context)
123 lock_reference_id_generator_ =
124 get_context().get_lock_reference_id_generator();
128 MultiMapImpl::put(
const serialization::pimpl::data& key,
129 const serialization::pimpl::data& value)
131 auto request = protocol::codec::multimap_put_encode(
132 get_name(), key, value, util::get_current_thread_id());
133 return invoke_and_get_future<bool>(request, key);
136 boost::future<std::vector<serialization::pimpl::data>>
137 MultiMapImpl::get_data(
const serialization::pimpl::data& key)
139 auto request = protocol::codec::multimap_get_encode(
140 get_name(), key, util::get_current_thread_id());
141 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
146 MultiMapImpl::remove(
const serialization::pimpl::data& key,
147 const serialization::pimpl::data& value)
149 auto request = protocol::codec::multimap_removeentry_encode(
150 get_name(), key, value, util::get_current_thread_id());
151 return invoke_and_get_future<bool>(request, key);
154 boost::future<std::vector<serialization::pimpl::data>>
155 MultiMapImpl::remove_data(
const serialization::pimpl::data& key)
157 auto request = protocol::codec::multimap_remove_encode(
158 get_name(), key, util::get_current_thread_id());
159 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
163 boost::future<std::vector<serialization::pimpl::data>>
164 MultiMapImpl::key_set_data()
166 auto request = protocol::codec::multimap_keyset_encode(get_name());
167 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
171 boost::future<std::vector<serialization::pimpl::data>>
172 MultiMapImpl::values_data()
174 auto request = protocol::codec::multimap_values_encode(get_name());
175 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
179 boost::future<EntryVector>
180 MultiMapImpl::entry_set_data()
182 auto request = protocol::codec::multimap_entryset_encode(get_name());
183 return invoke_and_get_future<EntryVector>(request);
187 MultiMapImpl::contains_key(
const serialization::pimpl::data& key)
189 auto request = protocol::codec::multimap_containskey_encode(
190 get_name(), key, util::get_current_thread_id());
191 return invoke_and_get_future<bool>(request, key);
195 MultiMapImpl::contains_value(
const serialization::pimpl::data& value)
198 protocol::codec::multimap_containsvalue_encode(get_name(), value);
199 return invoke_and_get_future<bool>(request);
203 MultiMapImpl::contains_entry(
const serialization::pimpl::data& key,
204 const serialization::pimpl::data& value)
206 auto request = protocol::codec::multimap_containsentry_encode(
207 get_name(), key, value, util::get_current_thread_id());
208 return invoke_and_get_future<bool>(request, key);
214 auto request = protocol::codec::multimap_size_encode(get_name());
215 return invoke_and_get_future<int>(request);
219 MultiMapImpl::clear()
221 auto request = protocol::codec::multimap_clear_encode(get_name());
222 return to_void_future(invoke(request));
226 MultiMapImpl::value_count(
const serialization::pimpl::data& key)
228 auto request = protocol::codec::multimap_valuecount_encode(
229 get_name(), key, util::get_current_thread_id());
230 return invoke_and_get_future<int>(request, key);
233 boost::future<boost::uuids::uuid>
234 MultiMapImpl::add_entry_listener(
235 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
238 return register_listener(
239 create_multi_map_entry_listener_codec(include_value),
240 std::move(entry_event_handler));
243 boost::future<boost::uuids::uuid>
244 MultiMapImpl::add_entry_listener(
245 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
247 serialization::pimpl::data&& key)
249 return register_listener(
250 create_multi_map_entry_listener_codec(include_value, std::move(key)),
251 std::move(entry_event_handler));
255 MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
257 return get_context().get_client_listener_service().deregister_listener(
262 MultiMapImpl::lock(
const serialization::pimpl::data& key)
264 return lock(key, std::chrono::milliseconds(-1));
268 MultiMapImpl::lock(
const serialization::pimpl::data& key,
269 std::chrono::milliseconds lease_time)
271 auto request = protocol::codec::multimap_lock_encode(
274 util::get_current_thread_id(),
275 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
276 lock_reference_id_generator_->get_next_reference_id());
277 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
281 MultiMapImpl::is_locked(
const serialization::pimpl::data& key)
283 auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
284 return invoke_and_get_future<bool>(request, key);
288 MultiMapImpl::try_lock(
const serialization::pimpl::data& key)
290 auto request = protocol::codec::multimap_trylock_encode(
293 util::get_current_thread_id(),
296 lock_reference_id_generator_->get_next_reference_id());
297 return invoke_and_get_future<bool>(request, key);
301 MultiMapImpl::try_lock(
const serialization::pimpl::data& key,
302 std::chrono::milliseconds timeout)
304 return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
308 MultiMapImpl::try_lock(
const serialization::pimpl::data& key,
309 std::chrono::milliseconds timeout,
310 std::chrono::milliseconds lease_time)
312 auto request = protocol::codec::multimap_trylock_encode(
315 util::get_current_thread_id(),
316 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
317 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
318 lock_reference_id_generator_->get_next_reference_id());
319 return invoke_and_get_future<bool>(request, key);
323 MultiMapImpl::unlock(
const serialization::pimpl::data& key)
325 auto request = protocol::codec::multimap_unlock_encode(
328 util::get_current_thread_id(),
329 lock_reference_id_generator_->get_next_reference_id());
330 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
334 MultiMapImpl::force_unlock(
const serialization::pimpl::data& key)
336 auto request = protocol::codec::multimap_forceunlock_encode(
337 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
338 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
341 std::shared_ptr<spi::impl::ListenerMessageCodec>
342 MultiMapImpl::create_multi_map_entry_listener_codec(
bool include_value)
344 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
345 new MultiMapEntryListenerMessageCodec(get_name(), include_value));
348 std::shared_ptr<spi::impl::ListenerMessageCodec>
349 MultiMapImpl::create_multi_map_entry_listener_codec(
351 serialization::pimpl::data&& key)
353 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
354 new MultiMapEntryListenerToKeyCodec(
355 get_name(), include_value, std::move(key)));
359 MultiMapImpl::on_initialize()
361 ProxyImpl::on_initialize();
362 lock_reference_id_generator_ =
363 get_context().get_lock_reference_id_generator();
366 MultiMapImpl::MultiMapEntryListenerMessageCodec::
367 MultiMapEntryListenerMessageCodec(std::string name,
bool include_value)
368 : name_(std::move(name))
369 , include_value_(include_value)
372 protocol::ClientMessage
373 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(
374 bool local_only)
const
376 return protocol::codec::multimap_addentrylistener_encode(
377 name_, include_value_, local_only);
380 protocol::ClientMessage
381 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
382 boost::uuids::uuid real_registration_id)
const
384 return protocol::codec::multimap_removeentrylistener_encode(
385 name_, real_registration_id);
388 protocol::ClientMessage
389 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(
390 bool local_only)
const
392 return protocol::codec::multimap_addentrylistenertokey_encode(
393 name_, key_, include_value_, local_only);
396 protocol::ClientMessage
397 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
398 boost::uuids::uuid real_registration_id)
const
400 return protocol::codec::multimap_removeentrylistener_encode(
401 name_, real_registration_id);
404 MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(
407 serialization::pimpl::data&& key)
408 : name_(std::move(name))
409 , include_value_(include_value)
410 , key_(std::move(key))
413 const std::shared_ptr<std::unordered_set<member>>
414 PNCounterImpl::EMPTY_ADDRESS_LIST(
new std::unordered_set<member>());
416 PNCounterImpl::PNCounterImpl(
const std::string& service_name,
417 const std::string& object_name,
418 spi::ClientContext* context)
419 : ProxyImpl(service_name, object_name, context)
420 , max_configured_replica_count_(0)
422 std::shared_ptr<impl::vector_clock>(new impl::vector_clock()))
423 , logger_(context->get_logger())
427 operator<<(std::ostream& os,
const PNCounterImpl& proxy)
429 os <<
"PNCounter{name='" << proxy.get_name() <<
"\'}";
433 boost::future<int64_t>
436 boost::shared_ptr<member> target =
437 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
439 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
440 "ClientPNCounterProxy::get",
441 "Cannot invoke operations on a CRDT because the cluster does not "
442 "contain any data members"));
444 return invoke_get_internal(EMPTY_ADDRESS_LIST,
nullptr, target);
447 boost::future<int64_t>
448 PNCounterImpl::get_and_add(int64_t delta)
450 boost::shared_ptr<member> target =
451 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
453 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
454 "ClientPNCounterProxy::getAndAdd",
455 "Cannot invoke operations on a CRDT because the cluster does not "
456 "contain any data members"));
458 return invoke_add_internal(
459 delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
462 boost::future<int64_t>
463 PNCounterImpl::add_and_get(int64_t delta)
465 boost::shared_ptr<member> target =
466 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
468 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
469 "ClientPNCounterProxy::addAndGet",
470 "Cannot invoke operations on a CRDT because the cluster does not "
471 "contain any data members"));
473 return invoke_add_internal(
474 delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
477 boost::future<int64_t>
478 PNCounterImpl::get_and_subtract(int64_t delta)
480 boost::shared_ptr<member> target =
481 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
483 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
484 "ClientPNCounterProxy::getAndSubtract",
485 "Cannot invoke operations on a CRDT because the cluster does not "
486 "contain any data members"));
488 return invoke_add_internal(
489 -delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
492 boost::future<int64_t>
493 PNCounterImpl::subtract_and_get(int64_t delta)
495 boost::shared_ptr<member> target =
496 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
498 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
499 "ClientPNCounterProxy::subtractAndGet",
500 "Cannot invoke operations on a CRDT because the cluster does not "
501 "contain any data members"));
503 return invoke_add_internal(
504 -delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
507 boost::future<int64_t>
508 PNCounterImpl::decrement_and_get()
510 boost::shared_ptr<member> target =
511 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
513 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
514 "ClientPNCounterProxy::decrementAndGet",
515 "Cannot invoke operations on a CRDT because the cluster does not "
516 "contain any data members"));
518 return invoke_add_internal(-1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
521 boost::future<int64_t>
522 PNCounterImpl::increment_and_get()
524 boost::shared_ptr<member> target =
525 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
527 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
528 "ClientPNCounterProxy::incrementAndGet",
529 "Cannot invoke operations on a CRDT because the cluster does not "
530 "contain any data members"));
532 return invoke_add_internal(1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
535 boost::future<int64_t>
536 PNCounterImpl::get_and_decrement()
538 boost::shared_ptr<member> target =
539 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
541 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
542 "ClientPNCounterProxy::getAndDecrement",
543 "Cannot invoke operations on a CRDT because the cluster does not "
544 "contain any data members"));
546 return invoke_add_internal(-1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
549 boost::future<int64_t>
550 PNCounterImpl::get_and_increment()
552 boost::shared_ptr<member> target =
553 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
555 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
556 "ClientPNCounterProxy::getAndIncrement",
557 "Cannot invoke operations on a CRDT because the cluster does not "
558 "contain any data members"));
560 return invoke_add_internal(1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
564 PNCounterImpl::reset()
567 std::shared_ptr<impl::vector_clock>(
new impl::vector_clock());
568 return boost::make_ready_future();
571 boost::shared_ptr<member>
572 PNCounterImpl::get_crdt_operation_target(
573 const std::unordered_set<member>& excluded_addresses)
575 auto replicaAddress = current_target_replica_address_.load();
576 if (replicaAddress &&
577 excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
578 return replicaAddress;
582 std::lock_guard<std::mutex> guard(target_selection_mutex_);
583 replicaAddress = current_target_replica_address_.load();
584 if (!replicaAddress || excluded_addresses.find(*replicaAddress) !=
585 excluded_addresses.end()) {
586 current_target_replica_address_ =
587 choose_target_replica(excluded_addresses);
590 return current_target_replica_address_;
593 boost::shared_ptr<member>
594 PNCounterImpl::choose_target_replica(
595 const std::unordered_set<member>& excluded_addresses)
597 std::vector<member> replicaAddresses =
598 get_replica_addresses(excluded_addresses);
599 if (replicaAddresses.empty()) {
604 int randomReplicaIndex = std::abs(rand()) % (int)replicaAddresses.size();
605 return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
609 PNCounterImpl::get_replica_addresses(
610 const std::unordered_set<member>& excluded_members)
612 std::vector<member> dataMembers =
613 get_context().get_client_cluster_service().get_members(
614 *member_selectors::DATA_MEMBER_SELECTOR);
615 int32_t replicaCount = get_max_configured_replica_count();
616 int currentReplicaCount =
617 util::min<int>(replicaCount, (
int)dataMembers.size());
619 std::vector<member> replicaMembers;
620 for (
int i = 0; i < currentReplicaCount; i++) {
621 if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
622 replicaMembers.push_back(dataMembers[i]);
625 return replicaMembers;
629 PNCounterImpl::get_max_configured_replica_count()
631 if (max_configured_replica_count_ > 0) {
632 return max_configured_replica_count_;
635 protocol::codec::pncounter_getconfiguredreplicacount_encode(
637 max_configured_replica_count_ =
638 invoke_and_get_future<int32_t>(request).get();
640 return max_configured_replica_count_;
643 boost::shared_ptr<member>
644 PNCounterImpl::try_choose_a_new_target(
645 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
646 boost::shared_ptr<member> last_target,
647 const exception::hazelcast_& last_exception)
652 boost::str(boost::format(
653 "Exception occurred while invoking operation on target %1%, "
654 "choosing different target. Cause: %2%") %
655 last_target % last_exception));
656 if (excluded_addresses == EMPTY_ADDRESS_LIST) {
659 excluded_addresses = std::make_shared<std::unordered_set<member>>();
661 excluded_addresses->insert(*last_target);
662 return get_crdt_operation_target(*excluded_addresses);
665 boost::future<int64_t>
666 PNCounterImpl::invoke_get_internal(
667 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
668 std::exception_ptr last_exception,
669 const boost::shared_ptr<member>& target)
672 if (last_exception) {
673 std::rethrow_exception(last_exception);
675 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
676 "ClientPNCounterProxy::invokeGetInternal",
677 "Cannot invoke operations on a CRDT because the cluster does not "
678 "contain any data members"));
682 auto timestamps = observed_clock_.get()->entry_set();
683 auto request = protocol::codec::pncounter_get_encode(
684 get_name(), timestamps, target->get_uuid());
685 return invoke_on_member(request, target->get_uuid())
687 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
689 return get_and_update_timestamps(std::move(f));
690 }
catch (exception::hazelcast_& e) {
691 return invoke_get_internal(excluded_addresses,
692 std::current_exception(),
693 try_choose_a_new_target(
694 excluded_addresses, target, e))
698 }
catch (exception::hazelcast_& e) {
699 return invoke_get_internal(
701 std::current_exception(),
702 try_choose_a_new_target(excluded_addresses, target, e));
706 boost::future<int64_t>
707 PNCounterImpl::invoke_add_internal(
709 bool getBeforeUpdate,
710 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
711 std::exception_ptr last_exception,
712 const boost::shared_ptr<member>& target)
715 if (last_exception) {
716 std::rethrow_exception(last_exception);
718 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
719 "ClientPNCounterProxy::invokeGetInternal",
720 "Cannot invoke operations on a CRDT because the cluster does not "
721 "contain any data members"));
726 auto request = protocol::codec::pncounter_add_encode(
730 observed_clock_.get()->entry_set(),
732 return invoke_on_member(request, target->get_uuid())
734 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
736 return get_and_update_timestamps(std::move(f));
737 }
catch (exception::hazelcast_& e) {
738 return invoke_add_internal(delta,
741 std::current_exception(),
742 try_choose_a_new_target(
743 excluded_addresses, target, e))
747 }
catch (exception::hazelcast_& e) {
748 return invoke_add_internal(
752 std::current_exception(),
753 try_choose_a_new_target(excluded_addresses, target, e));
758 PNCounterImpl::get_and_update_timestamps(
759 boost::future<protocol::ClientMessage> f)
762 auto value = msg.get_first_fixed_sized_field<int64_t>();
765 update_observed_replica_timestamps(
766 msg.get<impl::vector_clock::timestamp_vector>());
771 PNCounterImpl::update_observed_replica_timestamps(
772 const impl::vector_clock::timestamp_vector& received_logical_timestamps)
774 std::shared_ptr<impl::vector_clock> received =
775 to_vector_clock(received_logical_timestamps);
777 std::shared_ptr<impl::vector_clock> currentClock =
778 this->observed_clock_;
779 if (currentClock->is_after(*received)) {
782 if (observed_clock_.compare_and_set(currentClock, received)) {
788 std::shared_ptr<impl::vector_clock>
789 PNCounterImpl::to_vector_clock(
790 const impl::vector_clock::timestamp_vector& replica_logical_timestamps)
792 return std::shared_ptr<impl::vector_clock>(
793 new impl::vector_clock(replica_logical_timestamps));
796 boost::shared_ptr<member>
797 PNCounterImpl::get_current_target_replica_address()
799 return current_target_replica_address_.load();
802 IListImpl::IListImpl(
const std::string& instance_name,
803 spi::ClientContext* context)
804 : ProxyImpl(
"hz:impl:listService", instance_name, context)
806 serialization::pimpl::data key_data =
807 get_context().get_serialization_service().to_data<std::string>(
809 partition_id_ = get_partition_id(key_data);
813 IListImpl::remove_item_listener(boost::uuids::uuid registration_id)
815 return get_context().get_client_listener_service().deregister_listener(
822 auto request = protocol::codec::list_size_encode(get_name());
823 return invoke_and_get_future<int>(request, partition_id_);
827 IListImpl::is_empty()
829 auto request = protocol::codec::list_isempty_encode(get_name());
830 return invoke_and_get_future<bool>(request, partition_id_);
834 IListImpl::contains(
const serialization::pimpl::data& element)
836 auto request = protocol::codec::list_contains_encode(get_name(), element);
837 return invoke_and_get_future<bool>(request, partition_id_);
840 boost::future<std::vector<serialization::pimpl::data>>
841 IListImpl::to_array_data()
843 auto request = protocol::codec::list_getall_encode(get_name());
844 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
845 request, partition_id_);
849 IListImpl::add(
const serialization::pimpl::data& element)
851 auto request = protocol::codec::list_add_encode(get_name(), element);
852 return invoke_and_get_future<bool>(request, partition_id_);
856 IListImpl::remove(
const serialization::pimpl::data& element)
858 auto request = protocol::codec::list_remove_encode(get_name(), element);
859 return invoke_and_get_future<bool>(request, partition_id_);
863 IListImpl::contains_all_data(
864 const std::vector<serialization::pimpl::data>& elements)
867 protocol::codec::list_containsall_encode(get_name(), elements);
868 return invoke_and_get_future<bool>(request, partition_id_);
872 IListImpl::add_all_data(
const std::vector<serialization::pimpl::data>& elements)
874 auto request = protocol::codec::list_addall_encode(get_name(), elements);
875 return invoke_and_get_future<bool>(request, partition_id_);
879 IListImpl::add_all_data(
int index,
880 const std::vector<serialization::pimpl::data>& elements)
883 protocol::codec::list_addallwithindex_encode(get_name(), index, elements);
884 return invoke_and_get_future<bool>(request, partition_id_);
888 IListImpl::remove_all_data(
889 const std::vector<serialization::pimpl::data>& elements)
892 protocol::codec::list_compareandremoveall_encode(get_name(), elements);
893 return invoke_and_get_future<bool>(request, partition_id_);
897 IListImpl::retain_all_data(
898 const std::vector<serialization::pimpl::data>& elements)
901 protocol::codec::list_compareandretainall_encode(get_name(), elements);
902 return invoke_and_get_future<bool>(request, partition_id_);
908 auto request = protocol::codec::list_clear_encode(get_name());
909 return to_void_future(invoke_on_partition(request, partition_id_));
912 boost::future<boost::optional<serialization::pimpl::data>>
913 IListImpl::get_data(
int index)
915 auto request = protocol::codec::list_get_encode(get_name(), index);
916 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
917 request, partition_id_);
920 boost::future<boost::optional<serialization::pimpl::data>>
921 IListImpl::set_data(
int index,
const serialization::pimpl::data& element)
923 auto request = protocol::codec::list_set_encode(get_name(), index, element);
924 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
925 request, partition_id_);
929 IListImpl::add(
int index,
const serialization::pimpl::data& element)
932 protocol::codec::list_addwithindex_encode(get_name(), index, element);
933 return to_void_future(invoke_on_partition(request, partition_id_));
936 boost::future<boost::optional<serialization::pimpl::data>>
937 IListImpl::remove_data(
int index)
940 protocol::codec::list_removewithindex_encode(get_name(), index);
941 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
942 request, partition_id_);
946 IListImpl::index_of(
const serialization::pimpl::data& element)
948 auto request = protocol::codec::list_indexof_encode(get_name(), element);
949 return invoke_and_get_future<int>(request, partition_id_);
953 IListImpl::last_index_of(
const serialization::pimpl::data& element)
956 protocol::codec::list_lastindexof_encode(get_name(), element);
957 return invoke_and_get_future<int>(request, partition_id_);
960 boost::future<std::vector<serialization::pimpl::data>>
961 IListImpl::sub_list_data(
int from_index,
int to_index)
964 protocol::codec::list_sub_encode(get_name(), from_index, to_index);
965 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
966 request, partition_id_);
969 std::shared_ptr<spi::impl::ListenerMessageCodec>
970 IListImpl::create_item_listener_codec(
bool include_value)
972 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
973 new ListListenerMessageCodec(get_name(), include_value));
976 IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(
979 : name_(std::move(name))
980 , include_value_(include_value)
983 protocol::ClientMessage
984 IListImpl::ListListenerMessageCodec::encode_add_request(
bool local_only)
const
986 return protocol::codec::list_addlistener_encode(
987 name_, include_value_, local_only);
990 protocol::ClientMessage
991 IListImpl::ListListenerMessageCodec::encode_remove_request(
992 boost::uuids::uuid real_registration_id)
const
994 return protocol::codec::list_removelistener_encode(name_,
995 real_registration_id);
998 flake_id_generator_impl::Block::Block(IdBatch&& id_batch,
999 std::chrono::milliseconds validity)
1000 : id_batch_(id_batch)
1001 , invalid_since_(std::chrono::steady_clock::now() + validity)
1006 flake_id_generator_impl::Block::next()
1008 if (invalid_since_ <= std::chrono::steady_clock::now()) {
1013 index = num_returned_;
1014 if (index == id_batch_.get_batch_size()) {
1017 }
while (!num_returned_.compare_exchange_strong(index, index + 1));
1019 return id_batch_.get_base() + index * id_batch_.get_increment();
1022 flake_id_generator_impl::IdBatch::IdIterator
1023 flake_id_generator_impl::IdBatch::endOfBatch;
1026 flake_id_generator_impl::IdBatch::get_base()
const
1032 flake_id_generator_impl::IdBatch::get_increment()
const
1038 flake_id_generator_impl::IdBatch::get_batch_size()
const
1043 flake_id_generator_impl::IdBatch::IdBatch(int64_t base,
1047 , increment_(increment)
1048 , batch_size_(batch_size)
1051 flake_id_generator_impl::IdBatch::IdIterator&
1052 flake_id_generator_impl::IdBatch::end()
1057 flake_id_generator_impl::IdBatch::IdIterator
1058 flake_id_generator_impl::IdBatch::iterator()
1060 return flake_id_generator_impl::IdBatch::IdIterator(
1061 base_, increment_, batch_size_);
1064 flake_id_generator_impl::IdBatch::IdIterator::IdIterator(
1066 const int64_t increment,
1069 , increment_(increment)
1070 , remaining_(remaining)
1074 flake_id_generator_impl::IdBatch::IdIterator::operator==(
1075 const flake_id_generator_impl::IdBatch::IdIterator& rhs)
const
1077 return base2_ == rhs.base2_ && increment_ == rhs.increment_ &&
1078 remaining_ == rhs.remaining_;
1082 flake_id_generator_impl::IdBatch::IdIterator::operator!=(
1083 const flake_id_generator_impl::IdBatch::IdIterator& rhs)
const
1085 return !(rhs == *
this);
1088 flake_id_generator_impl::IdBatch::IdIterator::IdIterator()
1094 flake_id_generator_impl::IdBatch::IdIterator&
1095 flake_id_generator_impl::IdBatch::IdIterator::operator++()
1097 if (remaining_ == 0) {
1098 return flake_id_generator_impl::IdBatch::end();
1103 base2_ += increment_;
1108 flake_id_generator_impl::flake_id_generator_impl(
1109 const std::string& service_name,
1110 const std::string& object_name,
1111 spi::ClientContext* context)
1112 : ProxyImpl(service_name, object_name, context)
1116 context->get_client_config().find_flake_id_generator_config(object_name);
1117 batch_size_ = config->get_prefetch_count();
1118 validity_ = config->get_prefetch_validity_duration();
1122 flake_id_generator_impl::new_id_internal()
1124 auto b = block_.load();
1126 int64_t res = b->next();
1127 if (res != INT64_MIN) {
1132 throw std::overflow_error(
"");
1135 boost::future<int64_t>
1136 flake_id_generator_impl::new_id()
1139 return boost::make_ready_future(new_id_internal());
1140 }
catch (std::overflow_error&) {
1141 return new_id_batch(batch_size_)
1142 .then(boost::launch::sync,
1143 [=](boost::future<flake_id_generator_impl::IdBatch> f) {
1145 boost::make_shared<Block>(f.get(), validity_);
1146 auto value = newBlock->next();
1147 auto b = block_.load();
1148 block_.compare_exchange_strong(b, newBlock);
1154 boost::future<flake_id_generator_impl::IdBatch>
1155 flake_id_generator_impl::new_id_batch(int32_t size)
1158 protocol::codec::flakeidgenerator_newidbatch_encode(get_name(), size);
1159 return invoke(request).then(
1160 boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
1162 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
1164 auto base = msg.get<int64_t>();
1165 auto increment = msg.get<int64_t>();
1166 auto batch_size = msg.get<int32_t>();
1167 return flake_id_generator_impl::IdBatch(base, increment, batch_size);
1171 IQueueImpl::IQueueImpl(
const std::string& instance_name,
1172 spi::ClientContext* context)
1173 : ProxyImpl(
"hz:impl:queueService", instance_name, context)
1175 serialization::pimpl::data data =
1176 get_context().get_serialization_service().to_data<std::string>(
1178 partition_id_ = get_partition_id(data);
1182 IQueueImpl::remove_item_listener(boost::uuids::uuid registration_id)
1184 return get_context().get_client_listener_service().deregister_listener(
1189 IQueueImpl::offer(
const serialization::pimpl::data& element,
1190 std::chrono::milliseconds timeout)
1192 auto request = protocol::codec::queue_offer_encode(
1195 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1196 return invoke_and_get_future<bool>(request, partition_id_);
1200 IQueueImpl::put(
const serialization::pimpl::data& element)
1202 auto request = protocol::codec::queue_put_encode(get_name(), element);
1203 return to_void_future(invoke_on_partition(request, partition_id_));
1206 boost::future<boost::optional<serialization::pimpl::data>>
1207 IQueueImpl::poll_data(std::chrono::milliseconds timeout)
1209 auto request = protocol::codec::queue_poll_encode(
1211 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1212 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1213 request, partition_id_);
1217 IQueueImpl::remaining_capacity()
1219 auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
1220 return invoke_and_get_future<int>(request, partition_id_);
1224 IQueueImpl::remove(
const serialization::pimpl::data& element)
1226 auto request = protocol::codec::queue_remove_encode(get_name(), element);
1227 return invoke_and_get_future<bool>(request, partition_id_);
1231 IQueueImpl::contains(
const serialization::pimpl::data& element)
1233 auto request = protocol::codec::queue_contains_encode(get_name(), element);
1234 return invoke_and_get_future<bool>(request, partition_id_);
1237 boost::future<std::vector<serialization::pimpl::data>>
1238 IQueueImpl::drain_to_data(
size_t max_elements)
1240 auto request = protocol::codec::queue_draintomaxsize_encode(
1241 get_name(), (int32_t)max_elements);
1243 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1244 request, partition_id_);
1247 boost::future<std::vector<serialization::pimpl::data>>
1248 IQueueImpl::drain_to_data()
1250 auto request = protocol::codec::queue_drainto_encode(get_name());
1251 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1252 request, partition_id_);
1255 boost::future<boost::optional<serialization::pimpl::data>>
1256 IQueueImpl::take_data()
1258 auto request = protocol::codec::queue_take_encode(get_name());
1259 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1260 request, partition_id_);
1263 boost::future<boost::optional<serialization::pimpl::data>>
1264 IQueueImpl::peek_data()
1266 auto request = protocol::codec::queue_peek_encode(get_name());
1267 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1268 request, partition_id_);
1274 auto request = protocol::codec::queue_size_encode(get_name());
1275 return invoke_and_get_future<int>(request, partition_id_);
1279 IQueueImpl::is_empty()
1281 auto request = protocol::codec::queue_isempty_encode(get_name());
1282 return invoke_and_get_future<bool>(request, partition_id_);
1285 boost::future<std::vector<serialization::pimpl::data>>
1286 IQueueImpl::to_array_data()
1288 auto request = protocol::codec::queue_iterator_encode(get_name());
1289 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1290 request, partition_id_);
1294 IQueueImpl::contains_all_data(
1295 const std::vector<serialization::pimpl::data>& elements)
1298 protocol::codec::queue_containsall_encode(get_name(), elements);
1299 return invoke_and_get_future<bool>(request, partition_id_);
1303 IQueueImpl::add_all_data(
1304 const std::vector<serialization::pimpl::data>& elements)
1306 auto request = protocol::codec::queue_addall_encode(get_name(), elements);
1307 return invoke_and_get_future<bool>(request, partition_id_);
1311 IQueueImpl::remove_all_data(
1312 const std::vector<serialization::pimpl::data>& elements)
1315 protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
1316 return invoke_and_get_future<bool>(request, partition_id_);
1320 IQueueImpl::retain_all_data(
1321 const std::vector<serialization::pimpl::data>& elements)
1324 protocol::codec::queue_compareandretainall_encode(get_name(), elements);
1325 return invoke_and_get_future<bool>(request, partition_id_);
1331 auto request = protocol::codec::queue_clear_encode(get_name());
1332 return to_void_future(invoke_on_partition(request, partition_id_));
1335 std::shared_ptr<spi::impl::ListenerMessageCodec>
1336 IQueueImpl::create_item_listener_codec(
bool include_value)
1338 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1339 new QueueListenerMessageCodec(get_name(), include_value));
1342 IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(
1345 : name_(std::move(name))
1346 , include_value_(include_value)
1349 protocol::ClientMessage
1350 IQueueImpl::QueueListenerMessageCodec::encode_add_request(
bool local_only)
const
1352 return protocol::codec::queue_addlistener_encode(
1353 name_, include_value_, local_only);
1356 protocol::ClientMessage
1357 IQueueImpl::QueueListenerMessageCodec::encode_remove_request(
1358 boost::uuids::uuid real_registration_id)
const
1360 return protocol::codec::queue_removelistener_encode(name_,
1361 real_registration_id);
1364 ProxyImpl::ProxyImpl(
const std::string& service_name,
1365 const std::string& object_name,
1366 spi::ClientContext* context)
1367 : ClientProxy(object_name, service_name, *context)
1368 , SerializingProxy(*context, object_name)
1371 ProxyImpl::~ProxyImpl() =
default;
1373 SerializingProxy::SerializingProxy(spi::ClientContext& context,
1374 const std::string& object_name)
1375 : serialization_service_(context.get_serialization_service())
1376 , partition_service_(context.get_partition_service())
1377 , object_name_(object_name)
1378 , client_context_(context)
1382 SerializingProxy::get_partition_id(
const serialization::pimpl::data& key)
1384 return partition_service_.get_partition_id(key);
1387 boost::future<protocol::ClientMessage>
1388 SerializingProxy::invoke_on_partition(protocol::ClientMessage& request,
1392 return spi::impl::ClientInvocation::create(
1394 std::make_shared<protocol::ClientMessage>(std::move(request)),
1398 }
catch (exception::iexception&) {
1399 util::exception_util::rethrow(std::current_exception());
1400 return boost::make_ready_future(protocol::ClientMessage(0));
1404 boost::future<protocol::ClientMessage>
1405 SerializingProxy::invoke(protocol::ClientMessage& request)
1408 return spi::impl::ClientInvocation::create(
1410 std::make_shared<protocol::ClientMessage>(std::move(request)),
1413 }
catch (exception::iexception&) {
1414 util::exception_util::rethrow(std::current_exception());
1415 return boost::make_ready_future(protocol::ClientMessage(0));
1419 boost::future<protocol::ClientMessage>
1420 SerializingProxy::invoke_on_connection(
1421 protocol::ClientMessage& request,
1422 std::shared_ptr<connection::Connection> connection)
1425 return spi::impl::ClientInvocation::create(
1427 std::make_shared<protocol::ClientMessage>(std::move(request)),
1431 }
catch (exception::iexception&) {
1432 util::exception_util::rethrow(std::current_exception());
1433 return boost::make_ready_future(protocol::ClientMessage(0));
1437 boost::future<protocol::ClientMessage>
1438 SerializingProxy::invoke_on_key_owner(
1439 protocol::ClientMessage& request,
1440 const serialization::pimpl::data& key_data)
1443 return invoke_on_partition(request, get_partition_id(key_data));
1444 }
catch (exception::iexception&) {
1445 util::exception_util::rethrow(std::current_exception());
1446 return boost::make_ready_future(protocol::ClientMessage(0));
1450 boost::future<protocol::ClientMessage>
1451 SerializingProxy::invoke_on_member(protocol::ClientMessage& request,
1452 boost::uuids::uuid uuid)
1455 auto invocation = spi::impl::ClientInvocation::create(
1457 std::make_shared<protocol::ClientMessage>(std::move(request)),
1460 return invocation->invoke();
1461 }
catch (exception::iexception&) {
1462 util::exception_util::rethrow(std::current_exception());
1463 return boost::make_ready_future(protocol::ClientMessage(0));
1468 boost::future<boost::optional<serialization::pimpl::data>>
1469 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request)
1471 return decode_optional_var_sized<serialization::pimpl::data>(
1476 boost::future<boost::optional<map::data_entry_view>>
1477 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1478 const serialization::pimpl::data& key)
1480 return decode_optional_var_sized<map::data_entry_view>(
1481 invoke_on_key_owner(request, key));
1485 boost::future<boost::optional<serialization::pimpl::data>>
1486 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1489 return decode_optional_var_sized<serialization::pimpl::data>(
1490 invoke_on_partition(request, partition_id));
1494 boost::future<boost::optional<serialization::pimpl::data>>
1495 SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1496 const serialization::pimpl::data& key)
1498 return decode_optional_var_sized<serialization::pimpl::data>(
1499 invoke_on_key_owner(request, key));
1502 PartitionSpecificClientProxy::PartitionSpecificClientProxy(
1503 const std::string& service_name,
1504 const std::string& object_name,
1505 spi::ClientContext* context)
1506 : ProxyImpl(service_name, object_name, context)
1511 PartitionSpecificClientProxy::on_initialize()
1513 std::string partitionKey = internal::partition::strategy::
1514 StringPartitioningStrategy::get_partition_key(name_);
1515 partition_id_ = get_context().get_partition_service().get_partition_id(
1516 to_data<std::string>(partitionKey));
1519 IMapImpl::IMapImpl(
const std::string& instance_name,
1520 spi::ClientContext* context)
1521 : ProxyImpl(
"hz:impl:mapService", instance_name, context)
1525 IMapImpl::contains_key(
const serialization::pimpl::data& key)
1527 auto request = protocol::codec::map_containskey_encode(
1528 get_name(), key, util::get_current_thread_id());
1529 return invoke_and_get_future<bool>(request, key);
1533 IMapImpl::contains_value(
const serialization::pimpl::data& value)
1535 auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1536 return invoke_and_get_future<bool>(request);
1539 boost::future<boost::optional<serialization::pimpl::data>>
1540 IMapImpl::get_data(
const serialization::pimpl::data& key)
1542 auto request = protocol::codec::map_get_encode(
1543 get_name(), key, util::get_current_thread_id());
1544 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1548 boost::future<boost::optional<serialization::pimpl::data>>
1549 IMapImpl::remove_data(
const serialization::pimpl::data& key)
1551 auto request = protocol::codec::map_remove_encode(
1552 get_name(), key, util::get_current_thread_id());
1553 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1558 IMapImpl::remove(
const serialization::pimpl::data& key,
1559 const serialization::pimpl::data& value)
1561 auto request = protocol::codec::map_removeifsame_encode(
1562 get_name(), key, value, util::get_current_thread_id());
1563 return invoke_and_get_future<bool>(request, key);
1566 boost::future<protocol::ClientMessage>
1567 IMapImpl::remove_all(
const serialization::pimpl::data& predicate_data)
1570 protocol::codec::map_removeall_encode(get_name(), predicate_data);
1571 return invoke(request);
1574 boost::future<protocol::ClientMessage>
1575 IMapImpl::delete_entry(
const serialization::pimpl::data& key)
1577 auto request = protocol::codec::map_delete_encode(
1578 get_name(), key, util::get_current_thread_id());
1579 return invoke_on_partition(request, get_partition_id(key));
1582 boost::future<protocol::ClientMessage>
1585 auto request = protocol::codec::map_flush_encode(get_name());
1586 return invoke(request);
1590 IMapImpl::try_remove(
const serialization::pimpl::data& key,
1591 std::chrono::milliseconds timeout)
1593 auto request = protocol::codec::map_tryremove_encode(
1596 util::get_current_thread_id(),
1597 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1599 return invoke_and_get_future<bool>(request, key);
1603 IMapImpl::try_put(
const serialization::pimpl::data& key,
1604 const serialization::pimpl::data& value,
1605 std::chrono::milliseconds timeout)
1607 auto request = protocol::codec::map_tryput_encode(
1611 util::get_current_thread_id(),
1612 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1614 return invoke_and_get_future<bool>(request, key);
1617 boost::future<boost::optional<serialization::pimpl::data>>
1618 IMapImpl::put_data(
const serialization::pimpl::data& key,
1619 const serialization::pimpl::data& value,
1620 std::chrono::milliseconds ttl)
1622 auto request = protocol::codec::map_put_encode(
1626 util::get_current_thread_id(),
1627 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1628 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1632 boost::future<protocol::ClientMessage>
1633 IMapImpl::put_transient(
const serialization::pimpl::data& key,
1634 const serialization::pimpl::data& value,
1635 std::chrono::milliseconds ttl)
1637 auto request = protocol::codec::map_puttransient_encode(
1641 util::get_current_thread_id(),
1642 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1643 return invoke_on_partition(request, get_partition_id(key));
1646 boost::future<boost::optional<serialization::pimpl::data>>
1647 IMapImpl::put_if_absent_data(
const serialization::pimpl::data& key,
1648 const serialization::pimpl::data& value,
1649 std::chrono::milliseconds ttl)
1651 auto request = protocol::codec::map_putifabsent_encode(
1655 util::get_current_thread_id(),
1656 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1657 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1662 IMapImpl::replace(
const serialization::pimpl::data& key,
1663 const serialization::pimpl::data& old_value,
1664 const serialization::pimpl::data& new_value)
1666 auto request = protocol::codec::map_replaceifsame_encode(
1667 get_name(), key, old_value, new_value, util::get_current_thread_id());
1669 return invoke_and_get_future<bool>(request, key);
1672 boost::future<boost::optional<serialization::pimpl::data>>
1673 IMapImpl::replace_data(
const serialization::pimpl::data& key,
1674 const serialization::pimpl::data& value)
1676 auto request = protocol::codec::map_replace_encode(
1677 get_name(), key, value, util::get_current_thread_id());
1679 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1683 boost::future<protocol::ClientMessage>
1684 IMapImpl::set(
const serialization::pimpl::data& key,
1685 const serialization::pimpl::data& value,
1686 std::chrono::milliseconds ttl)
1688 auto request = protocol::codec::map_set_encode(
1692 util::get_current_thread_id(),
1693 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1694 return invoke_on_partition(request, get_partition_id(key));
1697 boost::future<protocol::ClientMessage>
1698 IMapImpl::lock(
const serialization::pimpl::data& key)
1700 return lock(key, std::chrono::milliseconds(-1));
1703 boost::future<protocol::ClientMessage>
1704 IMapImpl::lock(
const serialization::pimpl::data& key,
1705 std::chrono::milliseconds lease_time)
1707 auto request = protocol::codec::map_lock_encode(
1710 util::get_current_thread_id(),
1711 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1712 lock_reference_id_generator_->get_next_reference_id());
1713 return invoke_on_partition(request, get_partition_id(key));
1717 IMapImpl::is_locked(
const serialization::pimpl::data& key)
1719 auto request = protocol::codec::map_islocked_encode(get_name(), key);
1721 return invoke_and_get_future<bool>(request, key);
1725 IMapImpl::try_lock(
const serialization::pimpl::data& key,
1726 std::chrono::milliseconds timeout)
1728 return try_lock(key, timeout, std::chrono::milliseconds(-1));
1732 IMapImpl::try_lock(
const serialization::pimpl::data& key,
1733 std::chrono::milliseconds timeout,
1734 std::chrono::milliseconds lease_time)
1736 auto request = protocol::codec::map_trylock_encode(
1739 util::get_current_thread_id(),
1740 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1741 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1742 lock_reference_id_generator_->get_next_reference_id());
1743 return invoke_and_get_future<bool>(request, key);
1746 boost::future<protocol::ClientMessage>
1747 IMapImpl::unlock(
const serialization::pimpl::data& key)
1749 auto request = protocol::codec::map_unlock_encode(
1752 util::get_current_thread_id(),
1753 lock_reference_id_generator_->get_next_reference_id());
1754 return invoke_on_partition(request, get_partition_id(key));
1757 boost::future<protocol::ClientMessage>
1758 IMapImpl::force_unlock(
const serialization::pimpl::data& key)
1760 auto request = protocol::codec::map_forceunlock_encode(
1761 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
1762 return invoke_on_partition(request, get_partition_id(key));
1765 boost::future<std::string>
1766 IMapImpl::add_interceptor(
const serialization::pimpl::data& interceptor)
1769 protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1770 return invoke_and_get_future<std::string>(request);
1775 boost::future<boost::uuids::uuid>
1776 IMapImpl::add_entry_listener(
1777 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1779 int32_t listener_flags)
1781 return register_listener(
1782 create_map_entry_listener_codec(include_value, listener_flags),
1783 std::move(entry_event_handler));
1786 boost::future<boost::uuids::uuid>
1787 IMapImpl::add_entry_listener(
1788 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1789 serialization::pimpl::data&& predicate,
1791 int32_t listener_flags)
1793 return register_listener(
1794 create_map_entry_listener_codec(
1795 include_value, std::move(predicate), listener_flags),
1796 std::move(entry_event_handler));
1800 IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
1802 return get_context().get_client_listener_service().deregister_listener(
1806 boost::future<boost::uuids::uuid>
1807 IMapImpl::add_entry_listener(
1808 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1810 serialization::pimpl::data&& key,
1811 int32_t listener_flags)
1813 return register_listener(create_map_entry_listener_codec(
1814 include_value, listener_flags, std::move(key)),
1815 std::move(entry_event_handler));
1818 boost::future<boost::optional<map::data_entry_view>>
1819 IMapImpl::get_entry_view_data(
const serialization::pimpl::data& key)
1821 auto request = protocol::codec::map_getentryview_encode(
1822 get_name(), key, util::get_current_thread_id());
1823 return invoke_and_get_future<boost::optional<map::data_entry_view>>(request,
1828 IMapImpl::evict(
const serialization::pimpl::data& key)
1830 auto request = protocol::codec::map_evict_encode(
1831 get_name(), key, util::get_current_thread_id());
1832 return invoke_and_get_future<bool>(request, key);
1835 boost::future<protocol::ClientMessage>
1836 IMapImpl::evict_all()
1838 auto request = protocol::codec::map_evictall_encode(get_name());
1839 return invoke(request);
1842 boost::future<EntryVector>
1843 IMapImpl::get_all_data(
int partition_id,
1844 const std::vector<serialization::pimpl::data>& keys)
1846 auto request = protocol::codec::map_getall_encode(get_name(), keys);
1847 return invoke_and_get_future<EntryVector>(request, partition_id);
1850 boost::future<std::vector<serialization::pimpl::data>>
1851 IMapImpl::key_set_data()
1853 auto request = protocol::codec::map_keyset_encode(get_name());
1854 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1858 boost::future<std::vector<serialization::pimpl::data>>
1859 IMapImpl::key_set_data(
const serialization::pimpl::data& predicate)
1862 protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1863 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1868 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1869 IMapImpl::key_set_for_paging_predicate_data(
1870 protocol::codec::holder::paging_predicate_holder
const& predicate)
1872 auto request = protocol::codec::map_keysetwithpagingpredicate_encode(
1873 get_name(), predicate);
1874 return invoke(request).then(
1875 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1876 return get_paging_predicate_response<
1877 std::vector<serialization::pimpl::data>>(std::move(f));
1881 boost::future<EntryVector>
1882 IMapImpl::entry_set_data()
1884 auto request = protocol::codec::map_entryset_encode(get_name());
1885 return invoke_and_get_future<EntryVector>(request);
1888 boost::future<EntryVector>
1889 IMapImpl::entry_set_data(
const serialization::pimpl::data& predicate)
1892 protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1893 return invoke_and_get_future<EntryVector>(request);
1896 boost::future<std::pair<EntryVector, query::anchor_data_list>>
1897 IMapImpl::entry_set_for_paging_predicate_data(
1898 protocol::codec::holder::paging_predicate_holder
const& predicate)
1900 auto request = protocol::codec::map_entrieswithpagingpredicate_encode(
1901 get_name(), predicate);
1902 return invoke(request).then(
1903 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1904 return get_paging_predicate_response<EntryVector>(std::move(f));
1908 boost::future<std::vector<serialization::pimpl::data>>
1909 IMapImpl::values_data()
1911 auto request = protocol::codec::map_values_encode(get_name());
1912 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1916 boost::future<std::vector<serialization::pimpl::data>>
1917 IMapImpl::values_data(
const serialization::pimpl::data& predicate)
1920 protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1921 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1926 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1927 IMapImpl::values_for_paging_predicate_data(
1928 protocol::codec::holder::paging_predicate_holder
const& predicate)
1930 auto request = protocol::codec::map_valueswithpagingpredicate_encode(
1931 get_name(), predicate);
1932 return invoke(request).then(
1933 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1934 return get_paging_predicate_response<
1935 std::vector<serialization::pimpl::data>>(std::move(f));
1939 boost::future<protocol::ClientMessage>
1940 IMapImpl::add_index_data(
const config::index_config& config)
1942 auto request = protocol::codec::map_addindex_encode(get_name(), config);
1943 return invoke(request);
1949 auto request = protocol::codec::map_size_encode(get_name());
1950 return invoke_and_get_future<int>(request);
1954 IMapImpl::is_empty()
1956 auto request = protocol::codec::map_isempty_encode(get_name());
1957 return invoke_and_get_future<bool>(request);
1960 boost::future<protocol::ClientMessage>
1961 IMapImpl::put_all_data(
int partition_id,
const EntryVector& entries)
1964 protocol::codec::map_putall_encode(get_name(), entries,
true);
1965 return invoke_on_partition(request, partition_id);
1968 boost::future<protocol::ClientMessage>
1969 IMapImpl::clear_data()
1971 auto request = protocol::codec::map_clear_encode(get_name());
1972 return invoke(request);
1975 boost::future<boost::optional<serialization::pimpl::data>>
1976 IMapImpl::execute_on_key_data(
const serialization::pimpl::data& key,
1977 const serialization::pimpl::data& processor)
1979 auto request = protocol::codec::map_executeonkey_encode(
1980 get_name(), processor, key, util::get_current_thread_id());
1981 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1982 request, get_partition_id(key));
1985 boost::future<boost::optional<serialization::pimpl::data>>
1986 IMapImpl::submit_to_key_data(
const serialization::pimpl::data& key,
1987 const serialization::pimpl::data& processor)
1989 auto request = protocol::codec::map_submittokey_encode(
1990 get_name(), processor, key, util::get_current_thread_id());
1991 return invoke_on_partition(request, get_partition_id(key))
1992 .then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
1995 return msg.get_nullable<serialization::pimpl::data>();
1999 boost::future<EntryVector>
2000 IMapImpl::execute_on_keys_data(
2001 const std::vector<serialization::pimpl::data>& keys,
2002 const serialization::pimpl::data& processor)
2005 protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
2006 return invoke_and_get_future<EntryVector>(request);
2009 boost::future<protocol::ClientMessage>
2010 IMapImpl::remove_interceptor(
const std::string&
id)
2013 protocol::codec::map_removeinterceptor_encode(get_name(),
id);
2014 return invoke(request);
2017 boost::future<EntryVector>
2018 IMapImpl::execute_on_entries_data(
2019 const serialization::pimpl::data& entry_processor)
2022 protocol::codec::map_executeonallkeys_encode(get_name(), entry_processor);
2023 return invoke_and_get_future<EntryVector>(request);
2026 boost::future<EntryVector>
2027 IMapImpl::execute_on_entries_data(
2028 const serialization::pimpl::data& entry_processor,
2029 const serialization::pimpl::data& predicate)
2031 auto request = protocol::codec::map_executewithpredicate_encode(
2032 get_name(), entry_processor, predicate);
2033 return invoke_and_get_future<EntryVector>(request);
2036 std::shared_ptr<spi::impl::ListenerMessageCodec>
2037 IMapImpl::create_map_entry_listener_codec(
2039 serialization::pimpl::data&& predicate,
2040 int32_t listener_flags)
2042 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2043 new MapEntryListenerWithPredicateMessageCodec(
2044 get_name(), include_value, listener_flags, std::move(predicate)));
2047 std::shared_ptr<spi::impl::ListenerMessageCodec>
2048 IMapImpl::create_map_entry_listener_codec(
bool include_value,
2049 int32_t listener_flags)
2051 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2052 new MapEntryListenerMessageCodec(
2053 get_name(), include_value, listener_flags));
2056 std::shared_ptr<spi::impl::ListenerMessageCodec>
2057 IMapImpl::create_map_entry_listener_codec(
bool include_value,
2058 int32_t listener_flags,
2059 serialization::pimpl::data&& key)
2061 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2062 new MapEntryListenerToKeyCodec(
2063 get_name(), include_value, listener_flags, std::move(key)));
2067 IMapImpl::on_initialize()
2069 ProxyImpl::on_initialize();
2070 lock_reference_id_generator_ =
2071 get_context().get_lock_reference_id_generator();
2074 IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(
2077 int32_t listener_flags)
2078 : name_(std::move(name))
2079 , include_value_(include_value)
2080 , listener_flags_(listener_flags)
2083 protocol::ClientMessage
2084 IMapImpl::MapEntryListenerMessageCodec::encode_add_request(
2085 bool local_only)
const
2087 return protocol::codec::map_addentrylistener_encode(
2088 name_, include_value_,
static_cast<int32_t
>(listener_flags_), local_only);
2091 protocol::ClientMessage
2092 IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(
2093 boost::uuids::uuid real_registration_id)
const
2095 return protocol::codec::map_removeentrylistener_encode(
2096 name_, real_registration_id);
2099 protocol::ClientMessage
2100 IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(
bool local_only)
const
2102 return protocol::codec::map_addentrylistenertokey_encode(
2106 static_cast<int32_t
>(listener_flags_),
2110 protocol::ClientMessage
2111 IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(
2112 boost::uuids::uuid real_registration_id)
const
2114 return protocol::codec::map_removeentrylistener_encode(
2115 name_, real_registration_id);
2118 IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(
2121 int32_t listener_flags,
2122 serialization::pimpl::data key)
2123 : name_(std::move(name))
2124 , include_value_(include_value)
2125 , listener_flags_(listener_flags)
2126 , key_(std::move(key))
2129 IMapImpl::MapEntryListenerWithPredicateMessageCodec::
2130 MapEntryListenerWithPredicateMessageCodec(
2133 int32_t listener_flags,
2134 serialization::pimpl::data&& predicate)
2135 : name_(std::move(name))
2136 , include_value_(include_value)
2137 , listener_flags_(listener_flags)
2138 , predicate_(std::move(predicate))
2141 protocol::ClientMessage
2142 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(
2143 bool local_only)
const
2145 return protocol::codec::map_addentrylistenerwithpredicate_encode(
2149 static_cast<int32_t
>(listener_flags_),
2153 protocol::ClientMessage
2154 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
2155 boost::uuids::uuid real_registration_id)
const
2157 return protocol::codec::map_removeentrylistener_encode(
2158 name_, real_registration_id);
2161 TransactionalQueueImpl::TransactionalQueueImpl(
2162 const std::string& name,
2163 txn::TransactionProxy& transaction_proxy)
2164 : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy)
2168 TransactionalQueueImpl::offer(
const serialization::pimpl::data& e,
2169 std::chrono::milliseconds timeout)
2171 auto request = protocol::codec::transactionalqueue_offer_encode(
2173 get_transaction_id(),
2174 util::get_current_thread_id(),
2176 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2178 return invoke_and_get_future<bool>(request);
2181 boost::future<boost::optional<serialization::pimpl::data>>
2182 TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout)
2184 auto request = protocol::codec::transactionalqueue_poll_encode(
2186 get_transaction_id(),
2187 util::get_current_thread_id(),
2188 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2190 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
2195 TransactionalQueueImpl::size()
2197 auto request = protocol::codec::transactionalqueue_size_encode(
2198 get_name(), get_transaction_id(), util::get_current_thread_id());
2200 return invoke_and_get_future<int>(request);
2203 ISetImpl::ISetImpl(
const std::string& instance_name,
2204 spi::ClientContext* client_context)
2205 : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context)
2207 serialization::pimpl::data key_data =
2208 get_context().get_serialization_service().to_data<std::string>(
2210 partition_id_ = get_partition_id(key_data);
2214 ISetImpl::remove_item_listener(boost::uuids::uuid registration_id)
2216 return get_context().get_client_listener_service().deregister_listener(
2223 auto request = protocol::codec::set_size_encode(get_name());
2224 return invoke_and_get_future<int>(request, partition_id_);
2228 ISetImpl::is_empty()
2230 auto request = protocol::codec::set_isempty_encode(get_name());
2231 return invoke_and_get_future<bool>(request, partition_id_);
2235 ISetImpl::contains(
const serialization::pimpl::data& element)
2237 auto request = protocol::codec::set_contains_encode(get_name(), element);
2238 return invoke_and_get_future<bool>(request, partition_id_);
2241 boost::future<std::vector<serialization::pimpl::data>>
2242 ISetImpl::to_array_data()
2244 auto request = protocol::codec::set_getall_encode(get_name());
2245 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
2246 request, partition_id_);
2250 ISetImpl::add(
const serialization::pimpl::data& element)
2252 auto request = protocol::codec::set_add_encode(get_name(), element);
2253 return invoke_and_get_future<bool>(request, partition_id_);
2257 ISetImpl::remove(
const serialization::pimpl::data& element)
2259 auto request = protocol::codec::set_remove_encode(get_name(), element);
2260 return invoke_and_get_future<bool>(request, partition_id_);
2264 ISetImpl::contains_all(
const std::vector<serialization::pimpl::data>& elements)
2267 protocol::codec::set_containsall_encode(get_name(), elements);
2268 return invoke_and_get_future<bool>(request, partition_id_);
2272 ISetImpl::add_all(
const std::vector<serialization::pimpl::data>& elements)
2274 auto request = protocol::codec::set_addall_encode(get_name(), elements);
2275 return invoke_and_get_future<bool>(request, partition_id_);
2279 ISetImpl::remove_all(
const std::vector<serialization::pimpl::data>& elements)
2282 protocol::codec::set_compareandremoveall_encode(get_name(), elements);
2283 return invoke_and_get_future<bool>(request, partition_id_);
2287 ISetImpl::retain_all(
const std::vector<serialization::pimpl::data>& elements)
2290 protocol::codec::set_compareandretainall_encode(get_name(), elements);
2291 return invoke_and_get_future<bool>(request, partition_id_);
2297 auto request = protocol::codec::set_clear_encode(get_name());
2298 return to_void_future(invoke_on_partition(request, partition_id_));
2301 std::shared_ptr<spi::impl::ListenerMessageCodec>
2302 ISetImpl::create_item_listener_codec(
bool include_value)
2304 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2305 new SetListenerMessageCodec(get_name(), include_value));
2308 ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name,
2310 : name_(std::move(name))
2311 , include_value_(include_value)
2314 protocol::ClientMessage
2315 ISetImpl::SetListenerMessageCodec::encode_add_request(
bool local_only)
const
2317 return protocol::codec::set_addlistener_encode(
2318 name_, include_value_, local_only);
2321 protocol::ClientMessage
2322 ISetImpl::SetListenerMessageCodec::encode_remove_request(
2323 boost::uuids::uuid real_registration_id)
const
2325 return protocol::codec::set_removelistener_encode(name_,
2326 real_registration_id);
2329 ITopicImpl::ITopicImpl(
const std::string& instance_name,
2330 spi::ClientContext* context)
2331 : proxy::ProxyImpl(
"hz:impl:topicService", instance_name, context)
2332 , partition_id_(get_partition_id(to_data(instance_name)))
2336 ITopicImpl::publish(
const serialization::pimpl::data& data)
2338 auto request = protocol::codec::topic_publish_encode(get_name(), data);
2339 return to_void_future(invoke_on_partition(request, partition_id_));
2342 boost::future<boost::uuids::uuid>
2343 ITopicImpl::add_message_listener(
2344 std::shared_ptr<impl::BaseEventHandler> topic_event_handler)
2346 return register_listener(create_item_listener_codec(),
2347 std::move(topic_event_handler));
2351 ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id)
2353 return get_context().get_client_listener_service().deregister_listener(
2357 std::shared_ptr<spi::impl::ListenerMessageCodec>
2358 ITopicImpl::create_item_listener_codec()
2360 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2361 new TopicListenerMessageCodec(get_name()));
2364 ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(
2366 : name_(std::move(name))
2369 protocol::ClientMessage
2370 ITopicImpl::TopicListenerMessageCodec::encode_add_request(
bool local_only)
const
2372 return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
2375 protocol::ClientMessage
2376 ITopicImpl::TopicListenerMessageCodec::encode_remove_request(
2377 boost::uuids::uuid real_registration_id)
const
2379 return protocol::codec::topic_removemessagelistener_encode(
2380 name_, real_registration_id);
2383 ReplicatedMapImpl::ReplicatedMapImpl(
const std::string& service_name,
2384 const std::string& object_name,
2385 spi::ClientContext* context)
2386 : ProxyImpl(service_name, object_name, context)
2387 , target_partition_id_(-1)
2390 const int32_t RingbufferImpl::MAX_BATCH_SIZE{ 1000 };
2394 const serialization::pimpl::data&
2395 data_entry_view::get_key()
const
2400 const serialization::pimpl::data&
2401 data_entry_view::get_value()
const
2407 data_entry_view::get_cost()
const
2413 data_entry_view::get_creation_time()
const
2415 return creation_time_;
2419 data_entry_view::get_expiration_time()
const
2421 return expiration_time_;
2425 data_entry_view::get_hits()
const
2431 data_entry_view::get_last_access_time()
const
2433 return last_access_time_;
2437 data_entry_view::get_last_stored_time()
const
2439 return last_stored_time_;
2443 data_entry_view::get_last_update_time()
const
2445 return last_update_time_;
2449 data_entry_view::get_version()
const
2455 data_entry_view::get_ttl()
const
2461 data_entry_view::get_max_idle()
const
2466 data_entry_view::data_entry_view(serialization::pimpl::data&& key,
2467 serialization::pimpl::data&& value,
2469 int64_t creation_time,
2470 int64_t expiration_time,
2472 int64_t last_access_time,
2473 int64_t last_stored_time,
2474 int64_t last_update_time,
2478 : key_(std::move(key))
2479 , value_(std::move(value))
2481 , creation_time_(creation_time)
2482 , expiration_time_(expiration_time)
2484 , last_access_time_(last_access_time)
2485 , last_stored_time_(last_stored_time)
2486 , last_update_time_(last_update_time)
2489 , max_idle_(max_idle)
2495 namespace reliable {
2496 ReliableTopicMessage::ReliableTopicMessage()
2497 : publish_time_(std::chrono::system_clock::now())
2500 ReliableTopicMessage::ReliableTopicMessage(
2501 hazelcast::client::serialization::pimpl::data&& payload_data,
2502 std::unique_ptr<address> address)
2503 : publish_time_(std::chrono::system_clock::now())
2504 , payload_(std::move(payload_data))
2507 publisher_address_ = boost::make_optional(*address);
2511 std::chrono::system_clock::time_point
2512 ReliableTopicMessage::get_publish_time()
const
2514 return publish_time_;
2517 const boost::optional<address>&
2518 ReliableTopicMessage::get_publisher_address()
const
2520 return publisher_address_;
2523 serialization::pimpl::data&
2524 ReliableTopicMessage::get_payload()
2532 namespace serialization {
2534 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id()
2540 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id()
2542 return RELIABLE_TOPIC_MESSAGE;
2546 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
2547 const topic::impl::reliable::ReliableTopicMessage&
object,
2548 object_data_output& out)
2550 out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
2551 object.publish_time_.time_since_epoch())
2553 out.write_object(
object.publisher_address_);
2554 out.write(
object.payload_.to_byte_array());
2557 topic::impl::reliable::ReliableTopicMessage
2558 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(
2559 object_data_input& in)
2561 topic::impl::reliable::ReliableTopicMessage message;
2562 auto now = std::chrono::system_clock::now();
2563 message.publish_time_ = now +
2564 std::chrono::milliseconds(in.read<int64_t>()) -
2565 now.time_since_epoch();
2566 message.publisher_address_ = in.read_object<address>();
2568 serialization::pimpl::data(in.read<std::vector<byte>>().value());
2573 entry_event::entry_event(
const std::string& name,
2578 typed_data&& old_value,
2579 typed_data&& merging_value)
2581 , member_(std::move(member))
2582 , event_type_(event_type)
2583 , key_(std::move(key))
2584 , value_(std::move(value))
2585 , old_value_(std::move(old_value))
2586 , merging_value_(std::move(merging_value))
2610 return merging_value_;
2632 operator<<(std::ostream& os,
const entry_event& event)
2634 os <<
"name: " <<
event.name_ <<
" member: " <<
event.member_
2635 <<
" eventType: " <<
static_cast<int>(
event.event_type_)
2636 <<
" key: " << event.key_.
get_type()
2637 <<
" value: " <<
event.value_.get_type()
2638 <<
" oldValue: " <<
event.old_value_.get_type()
2639 <<
" mergingValue: " <<
event.merging_value_.get_type();
2644 entry_event::type event_type,
2645 const std::string& name,
2646 int number_of_entries_affected)
2648 , event_type_(event_type)
2650 , number_of_entries_affected_(number_of_entries_affected)
2674 return number_of_entries_affected_;
2678 operator<<(std::ostream& os,
const map_event& event)
2680 os <<
"MapEvent{member: " <<
event.member_
2681 <<
" eventType: " <<
static_cast<int>(
event.event_type_)
2682 <<
" name: " << event.name_
2683 <<
" numberOfEntriesAffected: " << event.number_of_entries_affected_;
2687 item_event_base::item_event_base(
const std::string& name,
2688 const member& member,
2689 const item_event_type& event_type)
2692 , event_type_(event_type)
2713 item_event_base::~item_event_base() =
default;
2715 flake_id_generator::flake_id_generator(
const std::string& object_name,
2716 spi::ClientContext* context)
2717 : flake_id_generator_impl(SERVICE_NAME, object_name, context)
const typed_data & get_key() const
Returns the key of the entry event.
const std::string & get_name() const
Returns the name of the map for this event.
const typed_data & get_old_value() const
Returns the old value of the entry event.
type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const typed_data & get_value() const
Returns the value of the entry event.
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
static const std::chrono::milliseconds UNSET
Default TTL value of a record.
item_event_type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const std::string & get_name() const
Returns the name of the collection for this event.
Map events common contract.
const std::string & get_name() const
Returns the name of the map for this event.
entry_event::type get_event_type() const
Return the event type.
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
const member & get_member() const
Returns the member fired this event.
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const