33 #include <unordered_set>
36 #include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
37 #include "hazelcast/client/proxy/PNCounterImpl.h"
38 #include "hazelcast/client/spi/ClientContext.h"
39 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
40 #include "hazelcast/client/proxy/flake_id_generator_impl.h"
41 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
42 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
43 #include "hazelcast/client/client_config.h"
44 #include "hazelcast/client/map/data_entry_view.h"
45 #include "hazelcast/client/proxy/RingbufferImpl.h"
46 #include "hazelcast/client/impl/vector_clock.h"
47 #include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
48 #include "hazelcast/util/Util.h"
49 #include "hazelcast/client/topic/reliable_listener.h"
55 reliable_topic::reliable_topic(
const std::string &instance_name, spi::ClientContext *context)
56 : proxy::ProxyImpl(reliable_topic::SERVICE_NAME, instance_name, context),
57 executor_(context->get_client_execution_service().get_user_executor()), logger_(context->get_logger()) {
58 auto reliable_config = context->get_client_config().lookup_reliable_topic_config(instance_name);
59 if (reliable_config) {
60 batch_size_ = reliable_config->get_read_batch_size();
62 batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
65 ringbuffer_ = context->get_hazelcast_client_implementation()->get_distributed_object<ringbuffer>(
66 std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
69 bool reliable_topic::remove_message_listener(
const std::string ®istration_id) {
70 int id = util::IOUtil::to_value<int>(registration_id);
71 auto runner = runners_map_.get(
id);
79 void reliable_topic::on_shutdown() {
81 for (
auto &entry : runners_map_.clear()) {
82 entry.second->cancel();
86 void reliable_topic::on_destroy() {
88 for (
auto &entry : runners_map_.clear()) {
89 entry.second->cancel();
93 void reliable_topic::post_destroy() {
95 ringbuffer_.get()->destroy().get();
99 reliable_listener::reliable_listener(
bool loss_tolerant, int64_t initial_sequence_id)
100 : loss_tolerant_(loss_tolerant)
101 , initial_sequence_id_(initial_sequence_id) {}
105 ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator() : reference_id_counter_(0) {}
107 int64_t ClientLockReferenceIdGenerator::get_next_reference_id() {
108 return ++reference_id_counter_;
113 MultiMapImpl::MultiMapImpl(
const std::string &instance_name, spi::ClientContext *context)
114 : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context) {
116 lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
119 boost::future<bool> MultiMapImpl::put(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
120 auto request = protocol::codec::multimap_put_encode(get_name(), key, value,
121 util::get_current_thread_id());
122 return invoke_and_get_future<bool>(request, key);
125 boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::get_data(
const serialization::pimpl::data &key) {
126 auto request = protocol::codec::multimap_get_encode(get_name(), key,
127 util::get_current_thread_id());
128 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, key);
131 boost::future<bool> MultiMapImpl::remove(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
132 auto request = protocol::codec::multimap_removeentry_encode(get_name(), key, value,
133 util::get_current_thread_id());
134 return invoke_and_get_future<bool>(request, key);
137 boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::remove_data(
const serialization::pimpl::data &key) {
138 auto request = protocol::codec::multimap_remove_encode(get_name(), key,
139 util::get_current_thread_id());
140 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, key);
143 boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::key_set_data() {
144 auto request = protocol::codec::multimap_keyset_encode(get_name());
145 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
148 boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::values_data() {
149 auto request = protocol::codec::multimap_values_encode(get_name());
150 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
153 boost::future<EntryVector> MultiMapImpl::entry_set_data() {
154 auto request = protocol::codec::multimap_entryset_encode(get_name());
155 return invoke_and_get_future<EntryVector>(request);
158 boost::future<bool> MultiMapImpl::contains_key(
const serialization::pimpl::data &key) {
159 auto request = protocol::codec::multimap_containskey_encode(get_name(), key,
160 util::get_current_thread_id());
161 return invoke_and_get_future<bool>(request, key);
164 boost::future<bool> MultiMapImpl::contains_value(
const serialization::pimpl::data &value) {
165 auto request = protocol::codec::multimap_containsvalue_encode(get_name(), value);
166 return invoke_and_get_future<bool>(request);
169 boost::future<bool> MultiMapImpl::contains_entry(
const serialization::pimpl::data &key,
170 const serialization::pimpl::data &value) {
171 auto request = protocol::codec::multimap_containsentry_encode(get_name(), key, value,
172 util::get_current_thread_id());
173 return invoke_and_get_future<bool>(request, key);
176 boost::future<int> MultiMapImpl::size() {
177 auto request = protocol::codec::multimap_size_encode(get_name());
178 return invoke_and_get_future<int>(request);
181 boost::future<void> MultiMapImpl::clear() {
182 auto request = protocol::codec::multimap_clear_encode(get_name());
183 return to_void_future(invoke(request));
186 boost::future<int> MultiMapImpl::value_count(
const serialization::pimpl::data &key) {
187 auto request = protocol::codec::multimap_valuecount_encode(get_name(), key,
188 util::get_current_thread_id());
189 return invoke_and_get_future<int>(request, key);
192 boost::future<boost::uuids::uuid>
193 MultiMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
194 bool include_value) {
195 return register_listener(create_multi_map_entry_listener_codec(include_value), std::move(entry_event_handler));
198 boost::future<boost::uuids::uuid>
199 MultiMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
200 bool include_value, serialization::pimpl::data &&key) {
201 return register_listener(create_multi_map_entry_listener_codec(include_value, std::move(key)),
202 std::move(entry_event_handler));
205 boost::future<bool> MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id) {
206 return get_context().get_client_listener_service().deregister_listener(registration_id);
209 boost::future<void> MultiMapImpl::lock(
const serialization::pimpl::data &key) {
210 return lock(key, std::chrono::milliseconds(-1));
213 boost::future<void> MultiMapImpl::lock(
const serialization::pimpl::data &key, std::chrono::milliseconds lease_time) {
214 auto request = protocol::codec::multimap_lock_encode(get_name(), key, util::get_current_thread_id(),
215 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
216 lock_reference_id_generator_->get_next_reference_id());
217 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
220 boost::future<bool> MultiMapImpl::is_locked(
const serialization::pimpl::data &key) {
221 auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
222 return invoke_and_get_future<bool>(request, key);
225 boost::future<bool> MultiMapImpl::try_lock(
const serialization::pimpl::data &key) {
226 auto request = protocol::codec::multimap_trylock_encode(get_name(), key,
227 util::get_current_thread_id(), INT64_MAX,
229 lock_reference_id_generator_->get_next_reference_id());
230 return invoke_and_get_future<bool>(request, key);
233 boost::future<bool> MultiMapImpl::try_lock(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
234 return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
237 boost::future<bool> MultiMapImpl::try_lock(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout, std::chrono::milliseconds lease_time) {
238 auto request = protocol::codec::multimap_trylock_encode(get_name(), key, util::get_current_thread_id(),
239 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
240 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
241 lock_reference_id_generator_->get_next_reference_id());
242 return invoke_and_get_future<bool>(request, key);
245 boost::future<void> MultiMapImpl::unlock(
const serialization::pimpl::data &key) {
246 auto request = protocol::codec::multimap_unlock_encode(get_name(), key, util::get_current_thread_id(),
247 lock_reference_id_generator_->get_next_reference_id());
248 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
251 boost::future<void> MultiMapImpl::force_unlock(
const serialization::pimpl::data &key) {
252 auto request = protocol::codec::multimap_forceunlock_encode(get_name(), key,
253 lock_reference_id_generator_->get_next_reference_id());
254 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
257 std::shared_ptr<spi::impl::ListenerMessageCodec>
258 MultiMapImpl::create_multi_map_entry_listener_codec(
bool include_value) {
259 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
260 new MultiMapEntryListenerMessageCodec(get_name(), include_value));
263 std::shared_ptr<spi::impl::ListenerMessageCodec>
264 MultiMapImpl::create_multi_map_entry_listener_codec(
bool include_value, serialization::pimpl::data &&key) {
265 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
266 new MultiMapEntryListenerToKeyCodec(get_name(), include_value, std::move(key)));
269 void MultiMapImpl::on_initialize() {
270 ProxyImpl::on_initialize();
271 lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
274 MultiMapImpl::MultiMapEntryListenerMessageCodec::MultiMapEntryListenerMessageCodec(std::string name,
276 : name_(std::move(name)), include_value_(include_value) {}
278 protocol::ClientMessage
279 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(
bool local_only)
const {
280 return protocol::codec::multimap_addentrylistener_encode(name_, include_value_, local_only);
283 protocol::ClientMessage
284 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
285 boost::uuids::uuid real_registration_id)
const {
286 return protocol::codec::multimap_removeentrylistener_encode(name_, real_registration_id);
289 protocol::ClientMessage
290 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(
bool local_only)
const {
291 return protocol::codec::multimap_addentrylistenertokey_encode(name_, key_, include_value_,
295 protocol::ClientMessage
296 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
297 boost::uuids::uuid real_registration_id)
const {
298 return protocol::codec::multimap_removeentrylistener_encode(name_, real_registration_id);
301 MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(std::string name,
303 serialization::pimpl::data &&key)
304 : name_(std::move(name)), include_value_(include_value), key_(std::move(key)) {}
306 const std::shared_ptr<std::unordered_set<member> > PNCounterImpl::EMPTY_ADDRESS_LIST(
307 new std::unordered_set<member>());
309 PNCounterImpl::PNCounterImpl(
const std::string &service_name,
const std::string &object_name,
310 spi::ClientContext *context)
311 : ProxyImpl(service_name, object_name, context), max_configured_replica_count_(0),
312 observed_clock_(std::shared_ptr<impl::vector_clock>(new impl::vector_clock())),
313 logger_(context->get_logger()) {
316 std::ostream &operator<<(std::ostream &os,
const PNCounterImpl &proxy) {
317 os <<
"PNCounter{name='" << proxy.get_name() <<
"\'}";
321 boost::future<int64_t> PNCounterImpl::get() {
322 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
324 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
"ClientPNCounterProxy::get",
325 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
327 return invoke_get_internal(EMPTY_ADDRESS_LIST,
nullptr, target);
330 boost::future<int64_t> PNCounterImpl::get_and_add(int64_t delta) {
331 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
333 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
"ClientPNCounterProxy::getAndAdd",
334 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
336 return invoke_add_internal(delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
339 boost::future<int64_t> PNCounterImpl::add_and_get(int64_t delta) {
340 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
342 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
"ClientPNCounterProxy::addAndGet",
343 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
345 return invoke_add_internal(delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
348 boost::future<int64_t> PNCounterImpl::get_and_subtract(int64_t delta) {
349 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
351 BOOST_THROW_EXCEPTION(
352 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::getAndSubtract",
353 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
355 return invoke_add_internal(-delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
359 boost::future<int64_t> PNCounterImpl::subtract_and_get(int64_t delta) {
360 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
362 BOOST_THROW_EXCEPTION(
363 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::subtractAndGet",
364 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
366 return invoke_add_internal(-delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
369 boost::future<int64_t> PNCounterImpl::decrement_and_get() {
370 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
372 BOOST_THROW_EXCEPTION(
373 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::decrementAndGet",
374 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
376 return invoke_add_internal(-1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
379 boost::future<int64_t> PNCounterImpl::increment_and_get() {
380 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
382 BOOST_THROW_EXCEPTION(
383 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::incrementAndGet",
384 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
386 return invoke_add_internal(1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
389 boost::future<int64_t> PNCounterImpl::get_and_decrement() {
390 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
392 BOOST_THROW_EXCEPTION(
393 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::getAndDecrement",
394 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
396 return invoke_add_internal(-1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
399 boost::future<int64_t> PNCounterImpl::get_and_increment() {
400 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
402 BOOST_THROW_EXCEPTION(
403 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::getAndIncrement",
404 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
406 return invoke_add_internal(1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
409 boost::future<void> PNCounterImpl::reset() {
410 observed_clock_ = std::shared_ptr<impl::vector_clock>(
new impl::vector_clock());
411 return boost::make_ready_future();
414 boost::shared_ptr<member>
415 PNCounterImpl::get_crdt_operation_target(
const std::unordered_set<member> &excluded_addresses) {
416 auto replicaAddress = current_target_replica_address_.load();
417 if (replicaAddress && excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
418 return replicaAddress;
422 std::lock_guard<std::mutex> guard(target_selection_mutex_);
423 replicaAddress = current_target_replica_address_.load();
424 if (!replicaAddress ||
425 excluded_addresses.find(*replicaAddress) != excluded_addresses.end()) {
426 current_target_replica_address_ = choose_target_replica(excluded_addresses);
429 return current_target_replica_address_;
432 boost::shared_ptr<member>
433 PNCounterImpl::choose_target_replica(
const std::unordered_set<member> &excluded_addresses) {
434 std::vector<member> replicaAddresses = get_replica_addresses(excluded_addresses);
435 if (replicaAddresses.empty()) {
439 int randomReplicaIndex = std::abs(rand()) % (int) replicaAddresses.size();
440 return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
443 std::vector<member> PNCounterImpl::get_replica_addresses(
const std::unordered_set<member> &excluded_members) {
444 std::vector<member> dataMembers = get_context().get_client_cluster_service().get_members(
445 *member_selectors::DATA_MEMBER_SELECTOR);
446 int32_t replicaCount = get_max_configured_replica_count();
447 int currentReplicaCount = util::min<int>(replicaCount, (
int) dataMembers.size());
449 std::vector<member> replicaMembers;
450 for (
int i = 0; i < currentReplicaCount; i++) {
451 if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
452 replicaMembers.push_back(dataMembers[i]);
455 return replicaMembers;
458 int32_t PNCounterImpl::get_max_configured_replica_count() {
459 if (max_configured_replica_count_ > 0) {
460 return max_configured_replica_count_;
462 auto request = protocol::codec::pncounter_getconfiguredreplicacount_encode(
464 max_configured_replica_count_ = invoke_and_get_future<int32_t>(request).get();
466 return max_configured_replica_count_;
469 boost::shared_ptr<member>
470 PNCounterImpl::try_choose_a_new_target(std::shared_ptr<std::unordered_set<member>> excluded_addresses,
471 boost::shared_ptr<member> last_target,
472 const exception::hazelcast_ &last_exception) {
473 HZ_LOG(logger_, finest,
474 boost::str(boost::format(
"Exception occurred while invoking operation on target %1%, "
475 "choosing different target. Cause: %2%")
476 % last_target % last_exception)
478 if (excluded_addresses == EMPTY_ADDRESS_LIST) {
480 excluded_addresses = std::make_shared<std::unordered_set<member>>();
482 excluded_addresses->insert(*last_target);
483 return get_crdt_operation_target(*excluded_addresses);
486 boost::future<int64_t>
487 PNCounterImpl::invoke_get_internal(std::shared_ptr<std::unordered_set<member>> excluded_addresses,
488 std::exception_ptr last_exception,
489 const boost::shared_ptr<member> &target) {
491 if (last_exception) {
492 std::rethrow_exception(last_exception);
494 BOOST_THROW_EXCEPTION(
495 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::invokeGetInternal",
496 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
500 auto timestamps = observed_clock_.get()->entry_set();
501 auto request = protocol::codec::pncounter_get_encode(get_name(), timestamps, target->get_uuid());
502 return invoke_on_member(request, target->get_uuid()).then(boost::launch::sync,
503 [=](boost::future<protocol::ClientMessage> f) {
505 return get_and_update_timestamps(
507 }
catch (exception::hazelcast_ &e) {
508 return invoke_get_internal(
510 std::current_exception(),
511 try_choose_a_new_target(
516 }
catch (exception::hazelcast_ &e) {
517 return invoke_get_internal(excluded_addresses, std::current_exception(),
518 try_choose_a_new_target(excluded_addresses, target, e));
522 boost::future<int64_t>
523 PNCounterImpl::invoke_add_internal(int64_t delta,
bool getBeforeUpdate,
524 std::shared_ptr<std::unordered_set<member> > excluded_addresses,
525 std::exception_ptr last_exception,
526 const boost::shared_ptr<member> &target) {
528 if (last_exception) {
529 std::rethrow_exception(last_exception);
531 BOOST_THROW_EXCEPTION(
532 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::invokeGetInternal",
533 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
538 auto request = protocol::codec::pncounter_add_encode(
539 get_name(), delta, getBeforeUpdate, observed_clock_.get()->entry_set(), target->get_uuid());
540 return invoke_on_member(request, target->get_uuid()).then(boost::launch::sync,
541 [=](boost::future<protocol::ClientMessage> f) {
543 return get_and_update_timestamps(
545 }
catch (exception::hazelcast_ &e) {
546 return invoke_add_internal(delta,
549 std::current_exception(),
550 try_choose_a_new_target(
556 }
catch (exception::hazelcast_ &e) {
557 return invoke_add_internal(delta, getBeforeUpdate, excluded_addresses, std::current_exception(),
558 try_choose_a_new_target(excluded_addresses, target, e));
562 int64_t PNCounterImpl::get_and_update_timestamps(boost::future<protocol::ClientMessage> f) {
564 auto value = msg.get_first_fixed_sized_field<int64_t>();
567 update_observed_replica_timestamps(msg.get<impl::vector_clock::timestamp_vector>());
571 void PNCounterImpl::update_observed_replica_timestamps(
572 const impl::vector_clock::timestamp_vector &received_logical_timestamps) {
573 std::shared_ptr<impl::vector_clock> received = to_vector_clock(received_logical_timestamps);
575 std::shared_ptr<impl::vector_clock> currentClock = this->observed_clock_;
576 if (currentClock->is_after(*received)) {
579 if (observed_clock_.compare_and_set(currentClock, received)) {
585 std::shared_ptr<impl::vector_clock> PNCounterImpl::to_vector_clock(
586 const impl::vector_clock::timestamp_vector &replica_logical_timestamps) {
587 return std::shared_ptr<impl::vector_clock>(
588 new impl::vector_clock(replica_logical_timestamps));
591 boost::shared_ptr<member> PNCounterImpl::get_current_target_replica_address() {
592 return current_target_replica_address_.load();
595 IListImpl::IListImpl(
const std::string &instance_name, spi::ClientContext *context)
596 : ProxyImpl(
"hz:impl:listService", instance_name, context) {
597 serialization::pimpl::data key_data = get_context().get_serialization_service().to_data<std::string>(
599 partition_id_ = get_partition_id(key_data);
602 boost::future<bool> IListImpl::remove_item_listener(boost::uuids::uuid registration_id) {
603 return get_context().get_client_listener_service().deregister_listener(registration_id);
606 boost::future<int> IListImpl::size() {
607 auto request = protocol::codec::list_size_encode(get_name());
608 return invoke_and_get_future<int>(request, partition_id_);
611 boost::future<bool> IListImpl::is_empty() {
612 auto request = protocol::codec::list_isempty_encode(get_name());
613 return invoke_and_get_future<bool>(request, partition_id_);
616 boost::future<bool> IListImpl::contains(
const serialization::pimpl::data &element) {
617 auto request = protocol::codec::list_contains_encode(get_name(), element);
618 return invoke_and_get_future<bool>(request, partition_id_);
621 boost::future<std::vector<serialization::pimpl::data>> IListImpl::to_array_data() {
622 auto request = protocol::codec::list_getall_encode(get_name());
623 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
626 boost::future<bool> IListImpl::add(
const serialization::pimpl::data &element) {
627 auto request = protocol::codec::list_add_encode(get_name(), element);
628 return invoke_and_get_future<bool>(request, partition_id_);
631 boost::future<bool> IListImpl::remove(
const serialization::pimpl::data &element) {
632 auto request = protocol::codec::list_remove_encode(get_name(), element);
633 return invoke_and_get_future<bool>(request, partition_id_);
636 boost::future<bool> IListImpl::contains_all_data(
const std::vector<serialization::pimpl::data> &elements) {
637 auto request = protocol::codec::list_containsall_encode(get_name(), elements);
638 return invoke_and_get_future<bool>(request, partition_id_);
641 boost::future<bool> IListImpl::add_all_data(
const std::vector<serialization::pimpl::data> &elements) {
642 auto request = protocol::codec::list_addall_encode(get_name(), elements);
643 return invoke_and_get_future<bool>(request, partition_id_);
646 boost::future<bool> IListImpl::add_all_data(
int index,
const std::vector<serialization::pimpl::data> &elements) {
647 auto request = protocol::codec::list_addallwithindex_encode(get_name(), index,
649 return invoke_and_get_future<bool>(request, partition_id_);
652 boost::future<bool> IListImpl::remove_all_data(
const std::vector<serialization::pimpl::data> &elements) {
653 auto request = protocol::codec::list_compareandremoveall_encode(get_name(), elements);
654 return invoke_and_get_future<bool>(request, partition_id_);
657 boost::future<bool> IListImpl::retain_all_data(
const std::vector<serialization::pimpl::data> &elements) {
658 auto request = protocol::codec::list_compareandretainall_encode(get_name(), elements);
659 return invoke_and_get_future<bool>(request, partition_id_);
662 boost::future<void> IListImpl::clear() {
663 auto request = protocol::codec::list_clear_encode(get_name());
664 return to_void_future(invoke_on_partition(request, partition_id_));
667 boost::future<boost::optional<serialization::pimpl::data>> IListImpl::get_data(
int index) {
668 auto request = protocol::codec::list_get_encode(get_name(), index);
669 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
672 boost::future<boost::optional<serialization::pimpl::data>> IListImpl::set_data(
int index,
673 const serialization::pimpl::data &element) {
674 auto request = protocol::codec::list_set_encode(get_name(), index, element);
675 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
678 boost::future<void> IListImpl::add(
int index,
const serialization::pimpl::data &element) {
679 auto request = protocol::codec::list_addwithindex_encode(get_name(), index, element);
680 return to_void_future(invoke_on_partition(request, partition_id_));
683 boost::future<boost::optional<serialization::pimpl::data>> IListImpl::remove_data(
int index) {
684 auto request = protocol::codec::list_removewithindex_encode(get_name(), index);
685 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
688 boost::future<int> IListImpl::index_of(
const serialization::pimpl::data &element) {
689 auto request = protocol::codec::list_indexof_encode(get_name(), element);
690 return invoke_and_get_future<int>(request, partition_id_);
693 boost::future<int> IListImpl::last_index_of(
const serialization::pimpl::data &element) {
694 auto request = protocol::codec::list_lastindexof_encode(get_name(), element);
695 return invoke_and_get_future<int>(request, partition_id_);
698 boost::future<std::vector<serialization::pimpl::data>> IListImpl::sub_list_data(
int from_index,
int to_index) {
699 auto request = protocol::codec::list_sub_encode(get_name(), from_index, to_index);
700 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
703 std::shared_ptr<spi::impl::ListenerMessageCodec> IListImpl::create_item_listener_codec(
bool include_value) {
704 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
705 new ListListenerMessageCodec(get_name(), include_value));
708 IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(std::string name,
709 bool include_value) : name_(std::move(name)),
713 protocol::ClientMessage
714 IListImpl::ListListenerMessageCodec::encode_add_request(
bool local_only)
const {
715 return protocol::codec::list_addlistener_encode(name_, include_value_, local_only);
718 protocol::ClientMessage
719 IListImpl::ListListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
720 return protocol::codec::list_removelistener_encode(name_, real_registration_id);
723 flake_id_generator_impl::Block::Block(IdBatch &&id_batch, std::chrono::milliseconds validity)
724 : id_batch_(id_batch), invalid_since_(std::chrono::steady_clock::now() + validity), num_returned_(0) {}
726 int64_t flake_id_generator_impl::Block::next() {
727 if (invalid_since_ <= std::chrono::steady_clock::now()) {
732 index = num_returned_;
733 if (index == id_batch_.get_batch_size()) {
736 }
while (!num_returned_.compare_exchange_strong(index, index + 1));
738 return id_batch_.get_base() + index * id_batch_.get_increment();
741 flake_id_generator_impl::IdBatch::IdIterator flake_id_generator_impl::IdBatch::endOfBatch;
743 const int64_t flake_id_generator_impl::IdBatch::get_base()
const {
747 const int64_t flake_id_generator_impl::IdBatch::get_increment()
const {
751 const int32_t flake_id_generator_impl::IdBatch::get_batch_size()
const {
755 flake_id_generator_impl::IdBatch::IdBatch(int64_t base, int64_t increment, int32_t batch_size)
756 : base_(base), increment_(increment), batch_size_(batch_size) {}
758 flake_id_generator_impl::IdBatch::IdIterator &flake_id_generator_impl::IdBatch::end() {
762 flake_id_generator_impl::IdBatch::IdIterator flake_id_generator_impl::IdBatch::iterator() {
763 return flake_id_generator_impl::IdBatch::IdIterator(base_, increment_, batch_size_);
766 flake_id_generator_impl::IdBatch::IdIterator::IdIterator(int64_t base2,
const int64_t increment, int32_t remaining) : base2_(
767 base2), increment_(increment), remaining_(remaining) {}
769 bool flake_id_generator_impl::IdBatch::IdIterator::operator==(
const flake_id_generator_impl::IdBatch::IdIterator &rhs)
const {
770 return base2_ == rhs.base2_ && increment_ == rhs.increment_ && remaining_ == rhs.remaining_;
773 bool flake_id_generator_impl::IdBatch::IdIterator::operator!=(
const flake_id_generator_impl::IdBatch::IdIterator &rhs)
const {
774 return !(rhs == *
this);
777 flake_id_generator_impl::IdBatch::IdIterator::IdIterator() : base2_(-1), increment_(-1), remaining_(-1) {
780 flake_id_generator_impl::IdBatch::IdIterator &flake_id_generator_impl::IdBatch::IdIterator::operator++() {
781 if (remaining_ == 0) {
782 return flake_id_generator_impl::IdBatch::end();
787 base2_ += increment_;
793 flake_id_generator_impl::flake_id_generator_impl(
const std::string &service_name,
const std::string &object_name,
794 spi::ClientContext *context)
795 : ProxyImpl(service_name, object_name, context), block_(nullptr) {
796 auto config = context->get_client_config().find_flake_id_generator_config(object_name);
797 batch_size_ = config->get_prefetch_count();
798 validity_ = config->get_prefetch_validity_duration();
801 int64_t flake_id_generator_impl::new_id_internal() {
802 auto b = block_.load();
804 int64_t res = b->next();
805 if (res != INT64_MIN) {
810 throw std::overflow_error(
"");
813 boost::future<int64_t> flake_id_generator_impl::new_id() {
815 return boost::make_ready_future(new_id_internal());
816 }
catch (std::overflow_error &) {
817 return new_id_batch(batch_size_).then(boost::launch::sync,
818 [=](boost::future<flake_id_generator_impl::IdBatch> f) {
819 auto newBlock = boost::make_shared<Block>(f.get(),
821 auto value = newBlock->next();
822 auto b = block_.load();
823 block_.compare_exchange_strong(b, newBlock);
829 boost::future<flake_id_generator_impl::IdBatch> flake_id_generator_impl::new_id_batch(int32_t size) {
830 auto request = protocol::codec::flakeidgenerator_newidbatch_encode(
832 return invoke(request).then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
834 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
836 auto base = msg.get<int64_t>();
837 auto increment = msg.get<int64_t>();
838 auto batch_size = msg.get<int32_t>();
839 return flake_id_generator_impl::IdBatch(base, increment, batch_size);
843 IQueueImpl::IQueueImpl(
const std::string &instance_name, spi::ClientContext *context)
844 : ProxyImpl(
"hz:impl:queueService", instance_name, context) {
845 serialization::pimpl::data data = get_context().get_serialization_service().to_data<std::string>(
847 partition_id_ = get_partition_id(data);
850 boost::future<bool> IQueueImpl::remove_item_listener(
851 boost::uuids::uuid registration_id) {
852 return get_context().get_client_listener_service().deregister_listener(registration_id);
855 boost::future<bool> IQueueImpl::offer(
const serialization::pimpl::data &element, std::chrono::milliseconds timeout) {
856 auto request = protocol::codec::queue_offer_encode(get_name(), element,
857 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
858 return invoke_and_get_future<bool>(request, partition_id_);
861 boost::future<void> IQueueImpl::put(
const serialization::pimpl::data &element) {
862 auto request = protocol::codec::queue_put_encode(get_name(), element);
863 return to_void_future(invoke_on_partition(request, partition_id_));
866 boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::poll_data(std::chrono::milliseconds timeout) {
867 auto request = protocol::codec::queue_poll_encode(get_name(), std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
868 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
871 boost::future<int> IQueueImpl::remaining_capacity() {
872 auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
873 return invoke_and_get_future<int>(request, partition_id_);
876 boost::future<bool> IQueueImpl::remove(
const serialization::pimpl::data &element) {
877 auto request = protocol::codec::queue_remove_encode(get_name(), element);
878 return invoke_and_get_future<bool>(request, partition_id_);
881 boost::future<bool> IQueueImpl::contains(
const serialization::pimpl::data &element) {
882 auto request = protocol::codec::queue_contains_encode(get_name(), element);
883 return invoke_and_get_future<bool>(request, partition_id_);
886 boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::drain_to_data(
size_t max_elements) {
887 auto request = protocol::codec::queue_draintomaxsize_encode(get_name(), (int32_t) max_elements);
889 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
892 boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::drain_to_data() {
893 auto request = protocol::codec::queue_drainto_encode(get_name());
894 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
897 boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::take_data() {
898 auto request = protocol::codec::queue_take_encode(get_name());
899 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
902 boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::peek_data() {
903 auto request = protocol::codec::queue_peek_encode(get_name());
904 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
907 boost::future<int> IQueueImpl::size() {
908 auto request = protocol::codec::queue_size_encode(get_name());
909 return invoke_and_get_future<int>(request, partition_id_);
912 boost::future<bool> IQueueImpl::is_empty() {
913 auto request = protocol::codec::queue_isempty_encode(get_name());
914 return invoke_and_get_future<bool>(request, partition_id_);
917 boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::to_array_data() {
918 auto request = protocol::codec::queue_iterator_encode(get_name());
919 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
922 boost::future<bool> IQueueImpl::contains_all_data(
const std::vector<serialization::pimpl::data> &elements) {
923 auto request = protocol::codec::queue_containsall_encode(get_name(), elements);
924 return invoke_and_get_future<bool>(request, partition_id_);
927 boost::future<bool> IQueueImpl::add_all_data(
const std::vector<serialization::pimpl::data> &elements) {
928 auto request = protocol::codec::queue_addall_encode(get_name(), elements);
929 return invoke_and_get_future<bool>(request, partition_id_);
932 boost::future<bool> IQueueImpl::remove_all_data(
const std::vector<serialization::pimpl::data> &elements) {
933 auto request = protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
934 return invoke_and_get_future<bool>(request, partition_id_);
937 boost::future<bool> IQueueImpl::retain_all_data(
const std::vector<serialization::pimpl::data> &elements) {
938 auto request = protocol::codec::queue_compareandretainall_encode(get_name(), elements);
939 return invoke_and_get_future<bool>(request, partition_id_);
942 boost::future<void> IQueueImpl::clear() {
943 auto request = protocol::codec::queue_clear_encode(get_name());
944 return to_void_future(invoke_on_partition(request, partition_id_));
947 std::shared_ptr<spi::impl::ListenerMessageCodec>
948 IQueueImpl::create_item_listener_codec(
bool include_value) {
949 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
950 new QueueListenerMessageCodec(get_name(), include_value));
953 IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(std::string name,
954 bool include_value) : name_(std::move(name)),
958 protocol::ClientMessage
959 IQueueImpl::QueueListenerMessageCodec::encode_add_request(
bool local_only)
const {
960 return protocol::codec::queue_addlistener_encode(name_, include_value_, local_only);
963 protocol::ClientMessage
964 IQueueImpl::QueueListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
965 return protocol::codec::queue_removelistener_encode(name_, real_registration_id);
968 ProxyImpl::ProxyImpl(
const std::string &service_name,
const std::string &object_name,
969 spi::ClientContext *context)
970 : ClientProxy(object_name, service_name, *context), SerializingProxy(*context, object_name) {}
972 ProxyImpl::~ProxyImpl() =
default;
974 SerializingProxy::SerializingProxy(spi::ClientContext &context,
const std::string &object_name)
975 : serialization_service_(context.get_serialization_service()),
976 partition_service_(context.get_partition_service()), object_name_(object_name), client_context_(context) {}
978 int SerializingProxy::get_partition_id(
const serialization::pimpl::data &key) {
979 return partition_service_.get_partition_id(key);
982 boost::future<protocol::ClientMessage> SerializingProxy::invoke_on_partition(
983 protocol::ClientMessage &request,
int partition_id) {
985 return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, partition_id)->invoke();
986 }
catch (exception::iexception &) {
987 util::exception_util::rethrow(std::current_exception());
988 return boost::make_ready_future(protocol::ClientMessage(0));
992 boost::future<protocol::ClientMessage> SerializingProxy::invoke(protocol::ClientMessage &request) {
994 return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_)->invoke();
995 }
catch (exception::iexception &) {
996 util::exception_util::rethrow(std::current_exception());
997 return boost::make_ready_future(protocol::ClientMessage(0));
1001 boost::future<protocol::ClientMessage> SerializingProxy::invoke_on_connection(protocol::ClientMessage &request,
1002 std::shared_ptr<connection::Connection> connection) {
1004 return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, connection)->invoke();
1005 }
catch (exception::iexception &) {
1006 util::exception_util::rethrow(std::current_exception());
1007 return boost::make_ready_future(protocol::ClientMessage(0));
1011 boost::future<protocol::ClientMessage>
1012 SerializingProxy::invoke_on_key_owner(protocol::ClientMessage &request,
1013 const serialization::pimpl::data &key_data) {
1015 return invoke_on_partition(request, get_partition_id(key_data));
1016 }
catch (exception::iexception &) {
1017 util::exception_util::rethrow(std::current_exception());
1018 return boost::make_ready_future(protocol::ClientMessage(0));
1022 boost::future<protocol::ClientMessage>
1023 SerializingProxy::invoke_on_member(protocol::ClientMessage &request, boost::uuids::uuid uuid) {
1025 auto invocation = spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, uuid);
1026 return invocation->invoke();
1027 }
catch (exception::iexception &) {
1028 util::exception_util::rethrow(std::current_exception());
1029 return boost::make_ready_future(protocol::ClientMessage(0));
1034 boost::future<boost::optional<serialization::pimpl::data>>
1035 SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request) {
1036 return decode_optional_var_sized<serialization::pimpl::data>(invoke(request));
1040 boost::future<boost::optional<map::data_entry_view>>
1041 SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request,
const serialization::pimpl::data &key) {
1042 return decode_optional_var_sized<map::data_entry_view>(invoke_on_key_owner(request, key));
1046 boost::future<boost::optional<serialization::pimpl::data>>
1047 SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request,
int partition_id) {
1048 return decode_optional_var_sized<serialization::pimpl::data>(invoke_on_partition(request, partition_id));
1052 boost::future<boost::optional<serialization::pimpl::data>>
1053 SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request,
const serialization::pimpl::data &key) {
1054 return decode_optional_var_sized<serialization::pimpl::data>(invoke_on_key_owner(request, key));
1057 PartitionSpecificClientProxy::PartitionSpecificClientProxy(
const std::string &service_name,
1058 const std::string &object_name,
1059 spi::ClientContext *context) : ProxyImpl(
1060 service_name, object_name, context), partition_id_(-1) {}
1062 void PartitionSpecificClientProxy::on_initialize() {
1063 std::string partitionKey = internal::partition::strategy::StringPartitioningStrategy::get_partition_key(
1065 partition_id_ = get_context().get_partition_service().get_partition_id(to_data<std::string>(partitionKey));
1068 IMapImpl::IMapImpl(
const std::string &instance_name, spi::ClientContext *context)
1069 : ProxyImpl(
"hz:impl:mapService", instance_name, context) {}
1071 boost::future<bool> IMapImpl::contains_key(
const serialization::pimpl::data &key) {
1072 auto request = protocol::codec::map_containskey_encode(get_name(), key, util::get_current_thread_id());
1073 return invoke_and_get_future<bool>(request, key);
1076 boost::future<bool> IMapImpl::contains_value(
const serialization::pimpl::data &value) {
1077 auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1078 return invoke_and_get_future<bool>(request);
1081 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::get_data(
const serialization::pimpl::data &key) {
1082 auto request = protocol::codec::map_get_encode(get_name(), key, util::get_current_thread_id());
1083 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1086 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::remove_data(
const serialization::pimpl::data &key) {
1087 auto request = protocol::codec::map_remove_encode(get_name(), key, util::get_current_thread_id());
1088 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1091 boost::future<bool> IMapImpl::remove(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
1092 auto request = protocol::codec::map_removeifsame_encode(get_name(), key, value,
1093 util::get_current_thread_id());
1094 return invoke_and_get_future<bool>(request, key);
1097 boost::future<protocol::ClientMessage> IMapImpl::remove_all(
const serialization::pimpl::data &predicate_data) {
1098 auto request = protocol::codec::map_removeall_encode(get_name(), predicate_data);
1099 return invoke(request);
1102 boost::future<protocol::ClientMessage> IMapImpl::delete_entry(
const serialization::pimpl::data &key) {
1103 auto request = protocol::codec::map_delete_encode(get_name(), key, util::get_current_thread_id());
1104 return invoke_on_partition(request, get_partition_id(key));
1107 boost::future<protocol::ClientMessage> IMapImpl::flush() {
1108 auto request = protocol::codec::map_flush_encode(get_name());
1109 return invoke(request);
1112 boost::future<bool> IMapImpl::try_remove(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
1113 auto request = protocol::codec::map_tryremove_encode(get_name(), key,
1114 util::get_current_thread_id(),
1115 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1117 return invoke_and_get_future<bool>(request, key);
1120 boost::future<bool> IMapImpl::try_put(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value,
1121 std::chrono::milliseconds timeout) {
1122 auto request = protocol::codec::map_tryput_encode(get_name(), key, value,
1123 util::get_current_thread_id(),
1124 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1126 return invoke_and_get_future<bool>(request, key);
1129 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::put_data(
const serialization::pimpl::data &key,
1130 const serialization::pimpl::data &value,
1131 std::chrono::milliseconds ttl) {
1132 auto request = protocol::codec::map_put_encode(get_name(), key, value,
1133 util::get_current_thread_id(),
1134 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1135 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1138 boost::future<protocol::ClientMessage> IMapImpl::put_transient(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value,
1139 std::chrono::milliseconds ttl) {
1140 auto request = protocol::codec::map_puttransient_encode(get_name(), key, value,
1141 util::get_current_thread_id(),
1142 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1143 return invoke_on_partition(request, get_partition_id(key));
1146 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::put_if_absent_data(
const serialization::pimpl::data &key,
1147 const serialization::pimpl::data &value,
1148 std::chrono::milliseconds ttl) {
1149 auto request = protocol::codec::map_putifabsent_encode(get_name(), key, value,
1150 util::get_current_thread_id(),
1151 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1152 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1155 boost::future<bool> IMapImpl::replace(
const serialization::pimpl::data &key,
const serialization::pimpl::data &old_value,
1156 const serialization::pimpl::data &new_value) {
1157 auto request = protocol::codec::map_replaceifsame_encode(get_name(), key, old_value,
1159 util::get_current_thread_id());
1161 return invoke_and_get_future<bool>(request, key);
1164 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::replace_data(
const serialization::pimpl::data &key,
1165 const serialization::pimpl::data &value) {
1166 auto request = protocol::codec::map_replace_encode(get_name(), key, value,
1167 util::get_current_thread_id());
1169 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1172 boost::future<protocol::ClientMessage>
1173 IMapImpl::set(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value,
1174 std::chrono::milliseconds ttl) {
1175 auto request = protocol::codec::map_set_encode(get_name(), key, value,
1176 util::get_current_thread_id(),
1177 std::chrono::duration_cast<std::chrono::milliseconds>(
1179 return invoke_on_partition(request, get_partition_id(key));
1182 boost::future<protocol::ClientMessage> IMapImpl::lock(
const serialization::pimpl::data &key) {
1183 return lock(key, std::chrono::milliseconds(-1));
1186 boost::future<protocol::ClientMessage> IMapImpl::lock(
const serialization::pimpl::data &key, std::chrono::milliseconds lease_time) {
1187 auto request = protocol::codec::map_lock_encode(get_name(), key, util::get_current_thread_id(),
1188 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1189 lock_reference_id_generator_->get_next_reference_id());
1190 return invoke_on_partition(request, get_partition_id(key));
1193 boost::future<bool> IMapImpl::is_locked(
const serialization::pimpl::data &key) {
1194 auto request = protocol::codec::map_islocked_encode(get_name(), key);
1196 return invoke_and_get_future<bool>(request, key);
1199 boost::future<bool> IMapImpl::try_lock(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
1200 return try_lock(key, timeout, std::chrono::milliseconds(-1));
1204 IMapImpl::try_lock(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout,
1205 std::chrono::milliseconds lease_time) {
1206 auto request = protocol::codec::map_trylock_encode(get_name(), key, util::get_current_thread_id(),
1207 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1208 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1209 lock_reference_id_generator_->get_next_reference_id());
1210 return invoke_and_get_future<bool>(request, key);
1213 boost::future<protocol::ClientMessage> IMapImpl::unlock(
const serialization::pimpl::data &key) {
1214 auto request = protocol::codec::map_unlock_encode(get_name(), key, util::get_current_thread_id(),
1215 lock_reference_id_generator_->get_next_reference_id());
1216 return invoke_on_partition(request, get_partition_id(key));
1219 boost::future<protocol::ClientMessage> IMapImpl::force_unlock(
const serialization::pimpl::data &key) {
1220 auto request = protocol::codec::map_forceunlock_encode(get_name(), key,
1221 lock_reference_id_generator_->get_next_reference_id());
1222 return invoke_on_partition(request, get_partition_id(key));
1225 boost::future<std::string> IMapImpl::add_interceptor(
const serialization::pimpl::data &interceptor) {
1226 auto request = protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1227 return invoke_and_get_future<std::string>(request);
1231 boost::future<boost::uuids::uuid> IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
bool include_value, int32_t listener_flags) {
1232 return register_listener(create_map_entry_listener_codec(include_value, listener_flags), std::move(entry_event_handler));
1235 boost::future<boost::uuids::uuid>
1236 IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1237 serialization::pimpl::data &&predicate,
bool include_value,
1238 int32_t listener_flags) {
1239 return register_listener(
1240 create_map_entry_listener_codec(include_value, std::move(predicate), listener_flags),
1241 std::move(entry_event_handler));
1244 boost::future<bool> IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id) {
1245 return get_context().get_client_listener_service().deregister_listener(registration_id);
1248 boost::future<boost::uuids::uuid>
1249 IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1250 bool include_value, serialization::pimpl::data &&key, int32_t listener_flags) {
1251 return register_listener(create_map_entry_listener_codec(include_value, listener_flags, std::move(key)),
1252 std::move(entry_event_handler));
1255 boost::future<boost::optional<map::data_entry_view>> IMapImpl::get_entry_view_data(
const serialization::pimpl::data &key) {
1256 auto request = protocol::codec::map_getentryview_encode(get_name(), key,
1257 util::get_current_thread_id());
1258 return invoke_and_get_future<boost::optional<map::data_entry_view>>(request, key);
1261 boost::future<bool> IMapImpl::evict(
const serialization::pimpl::data &key) {
1262 auto request = protocol::codec::map_evict_encode(get_name(), key, util::get_current_thread_id());
1263 return invoke_and_get_future<bool>(request, key);
1266 boost::future<protocol::ClientMessage> IMapImpl::evict_all() {
1267 auto request = protocol::codec::map_evictall_encode(get_name());
1268 return invoke(request);
1271 boost::future<EntryVector>
1272 IMapImpl::get_all_data(
int partition_id,
const std::vector<serialization::pimpl::data> &keys) {
1273 auto request = protocol::codec::map_getall_encode(get_name(), keys);
1274 return invoke_and_get_future<EntryVector>(request, partition_id);
1277 boost::future<std::vector<serialization::pimpl::data>> IMapImpl::key_set_data() {
1278 auto request = protocol::codec::map_keyset_encode(get_name());
1279 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1282 boost::future<std::vector<serialization::pimpl::data>> IMapImpl::key_set_data(
const serialization::pimpl::data &predicate) {
1283 auto request = protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1284 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1287 boost::future<std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>> IMapImpl::key_set_for_paging_predicate_data(
1288 protocol::codec::holder::paging_predicate_holder
const & predicate) {
1289 auto request = protocol::codec::map_keysetwithpagingpredicate_encode(get_name(), predicate);
1290 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1291 return get_paging_predicate_response<std::vector<serialization::pimpl::data>>(std::move(f));
1295 boost::future<EntryVector> IMapImpl::entry_set_data() {
1296 auto request = protocol::codec::map_entryset_encode(get_name());
1297 return invoke_and_get_future<EntryVector>(request);
1300 boost::future<EntryVector> IMapImpl::entry_set_data(
const serialization::pimpl::data &predicate) {
1301 auto request = protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1302 return invoke_and_get_future<EntryVector>(request);
1305 boost::future<std::pair<EntryVector, query::anchor_data_list>> IMapImpl::entry_set_for_paging_predicate_data(
1306 protocol::codec::holder::paging_predicate_holder
const & predicate) {
1307 auto request = protocol::codec::map_entrieswithpagingpredicate_encode(get_name(), predicate);
1308 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1309 return get_paging_predicate_response<EntryVector>(std::move(f));
1313 boost::future<std::vector<serialization::pimpl::data>> IMapImpl::values_data() {
1314 auto request = protocol::codec::map_values_encode(get_name());
1315 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1318 boost::future<std::vector<serialization::pimpl::data>> IMapImpl::values_data(
const serialization::pimpl::data &predicate) {
1319 auto request = protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1320 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1323 boost::future<std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1324 IMapImpl::values_for_paging_predicate_data(protocol::codec::holder::paging_predicate_holder
const & predicate) {
1325 auto request = protocol::codec::map_valueswithpagingpredicate_encode(get_name(), predicate);
1326 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1327 return get_paging_predicate_response<std::vector<serialization::pimpl::data>>(std::move(f));
1331 boost::future<protocol::ClientMessage> IMapImpl::add_index_data(
const config::index_config &config) {
1332 auto request = protocol::codec::map_addindex_encode(get_name(), config);
1333 return invoke(request);
1336 boost::future<int> IMapImpl::size() {
1337 auto request = protocol::codec::map_size_encode(get_name());
1338 return invoke_and_get_future<int>(request);
1341 boost::future<bool> IMapImpl::is_empty() {
1342 auto request = protocol::codec::map_isempty_encode(get_name());
1343 return invoke_and_get_future<bool>(request);
1346 boost::future<protocol::ClientMessage> IMapImpl::put_all_data(
int partition_id,
const EntryVector &entries) {
1347 auto request = protocol::codec::map_putall_encode(get_name(), entries,
true);
1348 return invoke_on_partition(request, partition_id);
1351 boost::future<protocol::ClientMessage> IMapImpl::clear_data() {
1352 auto request = protocol::codec::map_clear_encode(get_name());
1353 return invoke(request);
1356 boost::future<boost::optional<serialization::pimpl::data>>
1357 IMapImpl::execute_on_key_data(
const serialization::pimpl::data &key,
1358 const serialization::pimpl::data &processor) {
1359 auto request = protocol::codec::map_executeonkey_encode(get_name(),
1362 util::get_current_thread_id());
1363 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, get_partition_id(key));
1366 boost::future<boost::optional<serialization::pimpl::data>>
1367 IMapImpl::submit_to_key_data(
const serialization::pimpl::data &key,
1368 const serialization::pimpl::data &processor) {
1369 auto request = protocol::codec::map_submittokey_encode(get_name(),
1372 util::get_current_thread_id());
1373 return invoke_on_partition(request, get_partition_id(key)).then(boost::launch::sync,
1374 [](boost::future<protocol::ClientMessage> f) {
1377 return msg.get_nullable<serialization::pimpl::data>();
1381 boost::future<EntryVector> IMapImpl::execute_on_keys_data(
const std::vector<serialization::pimpl::data> &keys,
1382 const serialization::pimpl::data &processor) {
1383 auto request = protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
1384 return invoke_and_get_future<EntryVector>(request);
1387 boost::future<protocol::ClientMessage> IMapImpl::remove_interceptor(
const std::string &
id) {
1388 auto request = protocol::codec::map_removeinterceptor_encode(get_name(),
id);
1389 return invoke(request);
1392 boost::future<EntryVector> IMapImpl::execute_on_entries_data(
const serialization::pimpl::data &entry_processor) {
1393 auto request = protocol::codec::map_executeonallkeys_encode(
1394 get_name(), entry_processor);
1395 return invoke_and_get_future<EntryVector>(request);
1399 boost::future<EntryVector>
1400 IMapImpl::execute_on_entries_data(
const serialization::pimpl::data &entry_processor,
const serialization::pimpl::data &predicate) {
1401 auto request = protocol::codec::map_executewithpredicate_encode(get_name(),
1404 return invoke_and_get_future<EntryVector>(request);
1408 std::shared_ptr<spi::impl::ListenerMessageCodec>
1409 IMapImpl::create_map_entry_listener_codec(
bool include_value, serialization::pimpl::data &&predicate,
1410 int32_t listener_flags) {
1411 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1412 new MapEntryListenerWithPredicateMessageCodec(get_name(), include_value, listener_flags, std::move(predicate)));
1415 std::shared_ptr<spi::impl::ListenerMessageCodec>
1416 IMapImpl::create_map_entry_listener_codec(
bool include_value, int32_t listener_flags) {
1417 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1418 new MapEntryListenerMessageCodec(get_name(), include_value, listener_flags));
1421 std::shared_ptr<spi::impl::ListenerMessageCodec>
1422 IMapImpl::create_map_entry_listener_codec(
bool include_value, int32_t listener_flags,
1423 serialization::pimpl::data &&key) {
1424 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1425 new MapEntryListenerToKeyCodec(get_name(), include_value, listener_flags, std::move(key)));
1428 void IMapImpl::on_initialize() {
1429 ProxyImpl::on_initialize();
1430 lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
1433 IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(std::string name,
1435 int32_t listener_flags) : name_(std::move(name)),
1441 protocol::ClientMessage
1442 IMapImpl::MapEntryListenerMessageCodec::encode_add_request(
bool local_only)
const {
1443 return protocol::codec::map_addentrylistener_encode(name_, include_value_,
1444 static_cast<int32_t
>(listener_flags_),
1448 protocol::ClientMessage
1449 IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
1450 return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1453 protocol::ClientMessage
1454 IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(
bool local_only)
const {
1455 return protocol::codec::map_addentrylistenertokey_encode(name_, key_, include_value_,
1456 static_cast<int32_t
>(listener_flags_), local_only);
1459 protocol::ClientMessage
1460 IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
1461 return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1464 IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(std::string name,
bool include_value,
1465 int32_t listener_flags,
1466 serialization::pimpl::data key)
1467 : name_(std::move(name)), include_value_(include_value), listener_flags_(listener_flags), key_(std::move(key)) {}
1469 IMapImpl::MapEntryListenerWithPredicateMessageCodec::MapEntryListenerWithPredicateMessageCodec(
1470 std::string name,
bool include_value, int32_t listener_flags,
1471 serialization::pimpl::data &&predicate) : name_(std::move(name)), include_value_(include_value),
1472 listener_flags_(listener_flags), predicate_(std::move(predicate)) {}
1474 protocol::ClientMessage
1475 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(
bool local_only)
const {
1476 return protocol::codec::map_addentrylistenerwithpredicate_encode(name_, predicate_,
1478 static_cast<int32_t
>(listener_flags_), local_only);
1481 protocol::ClientMessage
1482 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
1483 boost::uuids::uuid real_registration_id)
const {
1484 return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1487 TransactionalQueueImpl::TransactionalQueueImpl(
const std::string &name,
1488 txn::TransactionProxy &transaction_proxy)
1489 : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy) {}
1491 boost::future<bool> TransactionalQueueImpl::offer(
const serialization::pimpl::data &e, std::chrono::milliseconds timeout) {
1492 auto request = protocol::codec::transactionalqueue_offer_encode(
1493 get_name(), get_transaction_id(), util::get_current_thread_id(), e, std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1495 return invoke_and_get_future<bool>(request);
1498 boost::future<boost::optional<serialization::pimpl::data>> TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout) {
1499 auto request = protocol::codec::transactionalqueue_poll_encode(
1500 get_name(), get_transaction_id(), util::get_current_thread_id(), std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1502 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request);
1505 boost::future<int> TransactionalQueueImpl::size() {
1506 auto request = protocol::codec::transactionalqueue_size_encode(
1507 get_name(), get_transaction_id(), util::get_current_thread_id());
1509 return invoke_and_get_future<int>(request);
1512 ISetImpl::ISetImpl(
const std::string &instance_name, spi::ClientContext *client_context)
1513 : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context) {
1514 serialization::pimpl::data key_data = get_context().get_serialization_service().to_data<std::string>(
1516 partition_id_ = get_partition_id(key_data);
1519 boost::future<bool> ISetImpl::remove_item_listener(boost::uuids::uuid registration_id) {
1520 return get_context().get_client_listener_service().deregister_listener(registration_id);
1523 boost::future<int> ISetImpl::size() {
1524 auto request = protocol::codec::set_size_encode(get_name());
1525 return invoke_and_get_future<int>(request, partition_id_);
1528 boost::future<bool> ISetImpl::is_empty() {
1529 auto request = protocol::codec::set_isempty_encode(get_name());
1530 return invoke_and_get_future<bool>(request, partition_id_);
1533 boost::future<bool> ISetImpl::contains(
const serialization::pimpl::data &element) {
1534 auto request = protocol::codec::set_contains_encode(get_name(), element);
1535 return invoke_and_get_future<bool>(request, partition_id_);
1538 boost::future<std::vector<serialization::pimpl::data>> ISetImpl::to_array_data() {
1539 auto request = protocol::codec::set_getall_encode(get_name());
1540 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
1543 boost::future<bool> ISetImpl::add(
const serialization::pimpl::data &element) {
1544 auto request = protocol::codec::set_add_encode(get_name(), element);
1545 return invoke_and_get_future<bool>(request, partition_id_);
1548 boost::future<bool> ISetImpl::remove(
const serialization::pimpl::data &element) {
1549 auto request = protocol::codec::set_remove_encode(get_name(), element);
1550 return invoke_and_get_future<bool>(request, partition_id_);
1553 boost::future<bool> ISetImpl::contains_all(
const std::vector<serialization::pimpl::data> &elements) {
1554 auto request = protocol::codec::set_containsall_encode(get_name(), elements);
1555 return invoke_and_get_future<bool>(request, partition_id_);
1558 boost::future<bool> ISetImpl::add_all(
const std::vector<serialization::pimpl::data> &elements) {
1559 auto request = protocol::codec::set_addall_encode(get_name(), elements);
1560 return invoke_and_get_future<bool>(request, partition_id_);
1563 boost::future<bool> ISetImpl::remove_all(
const std::vector<serialization::pimpl::data> &elements) {
1564 auto request = protocol::codec::set_compareandremoveall_encode(get_name(), elements);
1565 return invoke_and_get_future<bool>(request, partition_id_);
1568 boost::future<bool> ISetImpl::retain_all(
const std::vector<serialization::pimpl::data> &elements) {
1569 auto request = protocol::codec::set_compareandretainall_encode(get_name(), elements);
1570 return invoke_and_get_future<bool>(request, partition_id_);
1573 boost::future<void> ISetImpl::clear() {
1574 auto request = protocol::codec::set_clear_encode(get_name());
1575 return to_void_future(invoke_on_partition(request, partition_id_));
1578 std::shared_ptr<spi::impl::ListenerMessageCodec>
1579 ISetImpl::create_item_listener_codec(
bool include_value) {
1580 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1581 new SetListenerMessageCodec(get_name(), include_value));
1584 ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name,
bool include_value)
1585 : name_(std::move(name)), include_value_(include_value) {}
1587 protocol::ClientMessage
1588 ISetImpl::SetListenerMessageCodec::encode_add_request(
bool local_only)
const {
1589 return protocol::codec::set_addlistener_encode(name_, include_value_, local_only);
1592 protocol::ClientMessage
1593 ISetImpl::SetListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
1594 return protocol::codec::set_removelistener_encode(name_, real_registration_id);
1597 ITopicImpl::ITopicImpl(
const std::string &instance_name, spi::ClientContext *context)
1598 : proxy::ProxyImpl(
"hz:impl:topicService", instance_name, context),
1599 partition_id_(get_partition_id(to_data(instance_name))) {}
1601 boost::future<void> ITopicImpl::publish(
const serialization::pimpl::data &data) {
1602 auto request = protocol::codec::topic_publish_encode(get_name(), data);
1603 return to_void_future(invoke_on_partition(request, partition_id_));
1606 boost::future<boost::uuids::uuid> ITopicImpl::add_message_listener(std::shared_ptr<impl::BaseEventHandler> topic_event_handler) {
1607 return register_listener(create_item_listener_codec(), std::move(topic_event_handler));
1610 boost::future<bool> ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id) {
1611 return get_context().get_client_listener_service().deregister_listener(registration_id);
1614 std::shared_ptr<spi::impl::ListenerMessageCodec> ITopicImpl::create_item_listener_codec() {
1615 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
new TopicListenerMessageCodec(get_name()));
1618 ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(std::string name) : name_(std::move(name)) {}
1620 protocol::ClientMessage
1621 ITopicImpl::TopicListenerMessageCodec::encode_add_request(
bool local_only)
const {
1622 return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
1625 protocol::ClientMessage
1626 ITopicImpl::TopicListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
1627 return protocol::codec::topic_removemessagelistener_encode(name_, real_registration_id);
1630 ReplicatedMapImpl::ReplicatedMapImpl(
const std::string &service_name,
const std::string &object_name,
1631 spi::ClientContext *context) : ProxyImpl(service_name, object_name,
1633 target_partition_id_(-1) {}
1635 constexpr int32_t RingbufferImpl::MAX_BATCH_SIZE;
1639 const serialization::pimpl::data &data_entry_view::get_key()
const {
1643 const serialization::pimpl::data &data_entry_view::get_value()
const {
1647 int64_t data_entry_view::get_cost()
const {
1651 int64_t data_entry_view::get_creation_time()
const {
1652 return creation_time_;
1655 int64_t data_entry_view::get_expiration_time()
const {
1656 return expiration_time_;
1659 int64_t data_entry_view::get_hits()
const {
1663 int64_t data_entry_view::get_last_access_time()
const {
1664 return last_access_time_;
1667 int64_t data_entry_view::get_last_stored_time()
const {
1668 return last_stored_time_;
1671 int64_t data_entry_view::get_last_update_time()
const {
1672 return last_update_time_;
1675 int64_t data_entry_view::get_version()
const {
1679 int64_t data_entry_view::get_ttl()
const {
1683 int64_t data_entry_view::get_max_idle()
const {
1687 data_entry_view::data_entry_view(serialization::pimpl::data &&key, serialization::pimpl::data &&value,
1688 int64_t cost, int64_t creation_time,
1689 int64_t expiration_time, int64_t hits, int64_t last_access_time,
1690 int64_t last_stored_time, int64_t last_update_time, int64_t version,
1692 int64_t max_idle) : key_(std::move(key)), value_(std::move(value)),
1694 creation_time_(creation_time),
1695 expiration_time_(expiration_time),
1696 hits_(hits), last_access_time_(last_access_time),
1697 last_stored_time_(last_stored_time),
1698 last_update_time_(last_update_time), version_(version),
1700 max_idle_(max_idle) {}
1705 namespace reliable {
1706 ReliableTopicMessage::ReliableTopicMessage() : publish_time_(std::chrono::system_clock::now()) {}
1708 ReliableTopicMessage::ReliableTopicMessage(
1709 hazelcast::client::serialization::pimpl::data &&payload_data,
1710 std::unique_ptr<address> address)
1711 : publish_time_(std::chrono::system_clock::now())
1712 , payload_(std::move(payload_data)) {
1714 publisher_address_ = boost::make_optional(*address);
1718 std::chrono::system_clock::time_point ReliableTopicMessage::get_publish_time()
const {
1719 return publish_time_;
1722 const boost::optional<address> &ReliableTopicMessage::get_publisher_address()
const {
1723 return publisher_address_;
1726 serialization::pimpl::data &ReliableTopicMessage::get_payload() {
1733 namespace serialization {
1734 int32_t hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id() {
1738 int hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id() {
1739 return RELIABLE_TOPIC_MESSAGE;
1742 void hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
1743 const topic::impl::reliable::ReliableTopicMessage &
object, object_data_output &out) {
1744 out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
object.publish_time_.time_since_epoch()).count());
1745 out.write_object(
object.publisher_address_);
1746 out.write(
object.payload_.to_byte_array());
1749 topic::impl::reliable::ReliableTopicMessage
1750 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(object_data_input &in) {
1751 topic::impl::reliable::ReliableTopicMessage message;
1752 auto now = std::chrono::system_clock::now();
1753 message.publish_time_ = now + std::chrono::milliseconds(in.read<int64_t>()) - now.time_since_epoch();
1754 message.publisher_address_ = in.read_object<address>();
1755 message.payload_ = serialization::pimpl::data(in.read<std::vector<byte>>().value());
1760 entry_event::entry_event(
const std::string &name, member &&member, type event_type,
1761 typed_data &&key, typed_data &&value, typed_data &&old_value, typed_data &&merging_value)
1762 : name_(name), member_(std::move(member)), event_type_(event_type), key_(std::move(key)),
1763 value_(std::move(value)), old_value_(std::move(old_value)), merging_value_(std::move(merging_value)) {}
1778 return merging_value_;
1793 std::ostream &operator<<(std::ostream &os,
const entry_event &event) {
1794 os <<
"name: " <<
event.name_ <<
" member: " <<
event.member_ <<
" eventType: " <<
1795 static_cast<int>(
event.event_type_) <<
" key: " << event.key_.
get_type() <<
" value: " <<
event.value_.get_type() <<
1796 " oldValue: " <<
event.old_value_.get_type() <<
" mergingValue: " <<
event.merging_value_.get_type();
1801 int number_of_entries_affected)
1802 : member_(
member), event_type_(event_type), name_(name), number_of_entries_affected_(number_of_entries_affected) {}
1817 return number_of_entries_affected_;
1820 std::ostream &operator<<(std::ostream &os,
const map_event &event) {
1821 os <<
"MapEvent{member: " <<
event.member_ <<
" eventType: " <<
static_cast<int>(
event.event_type_) <<
" name: " << event.name_
1822 <<
" numberOfEntriesAffected: " << event.number_of_entries_affected_;
1826 item_event_base::item_event_base(
const std::string &name,
const member &member,
const item_event_type &event_type)
1827 : name_(name), member_(member), event_type_(event_type) {}
1841 item_event_base::~item_event_base() =
default;
1843 flake_id_generator::flake_id_generator(
const std::string &object_name, spi::ClientContext *context)
1844 : flake_id_generator_impl(SERVICE_NAME, object_name, context) {}
const typed_data & get_key() const
Returns the key of the entry event.
const std::string & get_name() const
Returns the name of the map for this event.
const typed_data & get_old_value() const
Returns the old value of the entry event.
type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const typed_data & get_value() const
Returns the value of the entry event.
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
static constexpr std::chrono::milliseconds UNSET
Default TTL value of a record.
item_event_type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const std::string & get_name() const
Returns the name of the collection for this event.
Map events common contract.
const std::string & get_name() const
Returns the name of the map for this event.
entry_event::type get_event_type() const
Return the event type.
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
const member & get_member() const
Returns the member fired this event.
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const