17 #include <unordered_set>
20 #include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
21 #include "hazelcast/client/proxy/PNCounterImpl.h"
22 #include "hazelcast/client/spi/ClientContext.h"
23 #include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
24 #include "hazelcast/client/proxy/flake_id_generator_impl.h"
25 #include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
26 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
27 #include "hazelcast/client/client_config.h"
28 #include "hazelcast/client/map/data_entry_view.h"
29 #include "hazelcast/client/proxy/RingbufferImpl.h"
30 #include "hazelcast/client/impl/vector_clock.h"
31 #include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
32 #include "hazelcast/util/Util.h"
33 #include "hazelcast/client/topic/reliable_listener.h"
39 reliable_topic::reliable_topic(
const std::string &instance_name, spi::ClientContext *context)
40 : proxy::ProxyImpl(reliable_topic::SERVICE_NAME, instance_name, context),
41 executor_(context->get_client_execution_service().get_user_executor()), logger_(context->get_logger()) {
42 auto reliable_config = context->get_client_config().lookup_reliable_topic_config(instance_name);
43 if (reliable_config) {
44 batch_size_ = reliable_config->get_read_batch_size();
46 batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
49 ringbuffer_ = context->get_hazelcast_client_implementation()->get_distributed_object<ringbuffer>(
50 std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
53 bool reliable_topic::remove_message_listener(
const std::string ®istration_id) {
54 int id = util::IOUtil::to_value<int>(registration_id);
55 auto runner = runners_map_.get(
id);
63 void reliable_topic::on_shutdown() {
65 for (
auto &entry : runners_map_.clear()) {
66 entry.second->cancel();
70 void reliable_topic::on_destroy() {
72 for (
auto &entry : runners_map_.clear()) {
73 entry.second->cancel();
77 void reliable_topic::post_destroy() {
79 ringbuffer_.get()->destroy().get();
83 reliable_listener::reliable_listener(
bool loss_tolerant, int64_t initial_sequence_id)
84 : loss_tolerant_(loss_tolerant)
85 , initial_sequence_id_(initial_sequence_id) {}
89 ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator() : reference_id_counter_(0) {}
91 int64_t ClientLockReferenceIdGenerator::get_next_reference_id() {
92 return ++reference_id_counter_;
97 MultiMapImpl::MultiMapImpl(
const std::string &instance_name, spi::ClientContext *context)
98 : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context) {
100 lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
103 boost::future<bool> MultiMapImpl::put(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
104 auto request = protocol::codec::multimap_put_encode(get_name(), key, value,
105 util::get_current_thread_id());
106 return invoke_and_get_future<bool>(request, key);
109 boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::get_data(
const serialization::pimpl::data &key) {
110 auto request = protocol::codec::multimap_get_encode(get_name(), key,
111 util::get_current_thread_id());
112 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, key);
115 boost::future<bool> MultiMapImpl::remove(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
116 auto request = protocol::codec::multimap_removeentry_encode(get_name(), key, value,
117 util::get_current_thread_id());
118 return invoke_and_get_future<bool>(request, key);
121 boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::remove_data(
const serialization::pimpl::data &key) {
122 auto request = protocol::codec::multimap_remove_encode(get_name(), key,
123 util::get_current_thread_id());
124 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, key);
127 boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::key_set_data() {
128 auto request = protocol::codec::multimap_keyset_encode(get_name());
129 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
132 boost::future<std::vector<serialization::pimpl::data>> MultiMapImpl::values_data() {
133 auto request = protocol::codec::multimap_values_encode(get_name());
134 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
137 boost::future<EntryVector> MultiMapImpl::entry_set_data() {
138 auto request = protocol::codec::multimap_entryset_encode(get_name());
139 return invoke_and_get_future<EntryVector>(request);
142 boost::future<bool> MultiMapImpl::contains_key(
const serialization::pimpl::data &key) {
143 auto request = protocol::codec::multimap_containskey_encode(get_name(), key,
144 util::get_current_thread_id());
145 return invoke_and_get_future<bool>(request, key);
148 boost::future<bool> MultiMapImpl::contains_value(
const serialization::pimpl::data &value) {
149 auto request = protocol::codec::multimap_containsvalue_encode(get_name(), value);
150 return invoke_and_get_future<bool>(request);
153 boost::future<bool> MultiMapImpl::contains_entry(
const serialization::pimpl::data &key,
154 const serialization::pimpl::data &value) {
155 auto request = protocol::codec::multimap_containsentry_encode(get_name(), key, value,
156 util::get_current_thread_id());
157 return invoke_and_get_future<bool>(request, key);
160 boost::future<int> MultiMapImpl::size() {
161 auto request = protocol::codec::multimap_size_encode(get_name());
162 return invoke_and_get_future<int>(request);
165 boost::future<void> MultiMapImpl::clear() {
166 auto request = protocol::codec::multimap_clear_encode(get_name());
167 return to_void_future(invoke(request));
170 boost::future<int> MultiMapImpl::value_count(
const serialization::pimpl::data &key) {
171 auto request = protocol::codec::multimap_valuecount_encode(get_name(), key,
172 util::get_current_thread_id());
173 return invoke_and_get_future<int>(request, key);
176 boost::future<boost::uuids::uuid>
177 MultiMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
178 bool include_value) {
179 return register_listener(create_multi_map_entry_listener_codec(include_value), std::move(entry_event_handler));
182 boost::future<boost::uuids::uuid>
183 MultiMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
184 bool include_value, serialization::pimpl::data &&key) {
185 return register_listener(create_multi_map_entry_listener_codec(include_value, std::move(key)),
186 std::move(entry_event_handler));
189 boost::future<bool> MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id) {
190 return get_context().get_client_listener_service().deregister_listener(registration_id);
193 boost::future<void> MultiMapImpl::lock(
const serialization::pimpl::data &key) {
194 return lock(key, std::chrono::milliseconds(-1));
197 boost::future<void> MultiMapImpl::lock(
const serialization::pimpl::data &key, std::chrono::milliseconds lease_time) {
198 auto request = protocol::codec::multimap_lock_encode(get_name(), key, util::get_current_thread_id(),
199 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
200 lock_reference_id_generator_->get_next_reference_id());
201 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
204 boost::future<bool> MultiMapImpl::is_locked(
const serialization::pimpl::data &key) {
205 auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
206 return invoke_and_get_future<bool>(request, key);
209 boost::future<bool> MultiMapImpl::try_lock(
const serialization::pimpl::data &key) {
210 auto request = protocol::codec::multimap_trylock_encode(get_name(), key,
211 util::get_current_thread_id(), INT64_MAX,
213 lock_reference_id_generator_->get_next_reference_id());
214 return invoke_and_get_future<bool>(request, key);
217 boost::future<bool> MultiMapImpl::try_lock(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
218 return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
221 boost::future<bool> MultiMapImpl::try_lock(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout, std::chrono::milliseconds lease_time) {
222 auto request = protocol::codec::multimap_trylock_encode(get_name(), key, util::get_current_thread_id(),
223 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
224 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
225 lock_reference_id_generator_->get_next_reference_id());
226 return invoke_and_get_future<bool>(request, key);
229 boost::future<void> MultiMapImpl::unlock(
const serialization::pimpl::data &key) {
230 auto request = protocol::codec::multimap_unlock_encode(get_name(), key, util::get_current_thread_id(),
231 lock_reference_id_generator_->get_next_reference_id());
232 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
235 boost::future<void> MultiMapImpl::force_unlock(
const serialization::pimpl::data &key) {
236 auto request = protocol::codec::multimap_forceunlock_encode(get_name(), key,
237 lock_reference_id_generator_->get_next_reference_id());
238 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
241 std::shared_ptr<spi::impl::ListenerMessageCodec>
242 MultiMapImpl::create_multi_map_entry_listener_codec(
bool include_value) {
243 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
244 new MultiMapEntryListenerMessageCodec(get_name(), include_value));
247 std::shared_ptr<spi::impl::ListenerMessageCodec>
248 MultiMapImpl::create_multi_map_entry_listener_codec(
bool include_value, serialization::pimpl::data &&key) {
249 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
250 new MultiMapEntryListenerToKeyCodec(get_name(), include_value, std::move(key)));
253 void MultiMapImpl::on_initialize() {
254 ProxyImpl::on_initialize();
255 lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
258 MultiMapImpl::MultiMapEntryListenerMessageCodec::MultiMapEntryListenerMessageCodec(std::string name,
260 : name_(std::move(name)), include_value_(include_value) {}
262 protocol::ClientMessage
263 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(
bool local_only)
const {
264 return protocol::codec::multimap_addentrylistener_encode(name_, include_value_, local_only);
267 protocol::ClientMessage
268 MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
269 boost::uuids::uuid real_registration_id)
const {
270 return protocol::codec::multimap_removeentrylistener_encode(name_, real_registration_id);
273 protocol::ClientMessage
274 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(
bool local_only)
const {
275 return protocol::codec::multimap_addentrylistenertokey_encode(name_, key_, include_value_,
279 protocol::ClientMessage
280 MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
281 boost::uuids::uuid real_registration_id)
const {
282 return protocol::codec::multimap_removeentrylistener_encode(name_, real_registration_id);
285 MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(std::string name,
287 serialization::pimpl::data &&key)
288 : name_(std::move(name)), include_value_(include_value), key_(std::move(key)) {}
290 const std::shared_ptr<std::unordered_set<member> > PNCounterImpl::EMPTY_ADDRESS_LIST(
291 new std::unordered_set<member>());
293 PNCounterImpl::PNCounterImpl(
const std::string &service_name,
const std::string &object_name,
294 spi::ClientContext *context)
295 : ProxyImpl(service_name, object_name, context), max_configured_replica_count_(0),
296 observed_clock_(std::shared_ptr<impl::vector_clock>(new impl::vector_clock())),
297 logger_(context->get_logger()) {
300 std::ostream &operator<<(std::ostream &os,
const PNCounterImpl &proxy) {
301 os <<
"PNCounter{name='" << proxy.get_name() <<
"\'}";
305 boost::future<int64_t> PNCounterImpl::get() {
306 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
308 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
"ClientPNCounterProxy::get",
309 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
311 return invoke_get_internal(EMPTY_ADDRESS_LIST,
nullptr, target);
314 boost::future<int64_t> PNCounterImpl::get_and_add(int64_t delta) {
315 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
317 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
"ClientPNCounterProxy::getAndAdd",
318 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
320 return invoke_add_internal(delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
323 boost::future<int64_t> PNCounterImpl::add_and_get(int64_t delta) {
324 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
326 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
"ClientPNCounterProxy::addAndGet",
327 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
329 return invoke_add_internal(delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
332 boost::future<int64_t> PNCounterImpl::get_and_subtract(int64_t delta) {
333 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
335 BOOST_THROW_EXCEPTION(
336 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::getAndSubtract",
337 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
339 return invoke_add_internal(-delta,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
343 boost::future<int64_t> PNCounterImpl::subtract_and_get(int64_t delta) {
344 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
346 BOOST_THROW_EXCEPTION(
347 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::subtractAndGet",
348 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
350 return invoke_add_internal(-delta,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
353 boost::future<int64_t> PNCounterImpl::decrement_and_get() {
354 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
356 BOOST_THROW_EXCEPTION(
357 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::decrementAndGet",
358 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
360 return invoke_add_internal(-1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
363 boost::future<int64_t> PNCounterImpl::increment_and_get() {
364 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
366 BOOST_THROW_EXCEPTION(
367 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::incrementAndGet",
368 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
370 return invoke_add_internal(1,
false, EMPTY_ADDRESS_LIST,
nullptr, target);
373 boost::future<int64_t> PNCounterImpl::get_and_decrement() {
374 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
376 BOOST_THROW_EXCEPTION(
377 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::getAndDecrement",
378 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
380 return invoke_add_internal(-1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
383 boost::future<int64_t> PNCounterImpl::get_and_increment() {
384 boost::shared_ptr<member> target = get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
386 BOOST_THROW_EXCEPTION(
387 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::getAndIncrement",
388 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
390 return invoke_add_internal(1,
true, EMPTY_ADDRESS_LIST,
nullptr, target);
393 boost::future<void> PNCounterImpl::reset() {
394 observed_clock_ = std::shared_ptr<impl::vector_clock>(
new impl::vector_clock());
395 return boost::make_ready_future();
398 boost::shared_ptr<member>
399 PNCounterImpl::get_crdt_operation_target(
const std::unordered_set<member> &excluded_addresses) {
400 auto replicaAddress = current_target_replica_address_.load();
401 if (replicaAddress && excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
402 return replicaAddress;
406 std::lock_guard<std::mutex> guard(target_selection_mutex_);
407 replicaAddress = current_target_replica_address_.load();
408 if (!replicaAddress ||
409 excluded_addresses.find(*replicaAddress) != excluded_addresses.end()) {
410 current_target_replica_address_ = choose_target_replica(excluded_addresses);
413 return current_target_replica_address_;
416 boost::shared_ptr<member>
417 PNCounterImpl::choose_target_replica(
const std::unordered_set<member> &excluded_addresses) {
418 std::vector<member> replicaAddresses = get_replica_addresses(excluded_addresses);
419 if (replicaAddresses.empty()) {
423 int randomReplicaIndex = std::abs(rand()) % (int) replicaAddresses.size();
424 return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
427 std::vector<member> PNCounterImpl::get_replica_addresses(
const std::unordered_set<member> &excluded_members) {
428 std::vector<member> dataMembers = get_context().get_client_cluster_service().get_members(
429 *member_selectors::DATA_MEMBER_SELECTOR);
430 int32_t replicaCount = get_max_configured_replica_count();
431 int currentReplicaCount = util::min<int>(replicaCount, (
int) dataMembers.size());
433 std::vector<member> replicaMembers;
434 for (
int i = 0; i < currentReplicaCount; i++) {
435 if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
436 replicaMembers.push_back(dataMembers[i]);
439 return replicaMembers;
442 int32_t PNCounterImpl::get_max_configured_replica_count() {
443 if (max_configured_replica_count_ > 0) {
444 return max_configured_replica_count_;
446 auto request = protocol::codec::pncounter_getconfiguredreplicacount_encode(
448 max_configured_replica_count_ = invoke_and_get_future<int32_t>(request).get();
450 return max_configured_replica_count_;
453 boost::shared_ptr<member>
454 PNCounterImpl::try_choose_a_new_target(std::shared_ptr<std::unordered_set<member>> excluded_addresses,
455 boost::shared_ptr<member> last_target,
456 const exception::hazelcast_ &last_exception) {
457 HZ_LOG(logger_, finest,
458 boost::str(boost::format(
"Exception occurred while invoking operation on target %1%, "
459 "choosing different target. Cause: %2%")
460 % last_target % last_exception)
462 if (excluded_addresses == EMPTY_ADDRESS_LIST) {
464 excluded_addresses = std::make_shared<std::unordered_set<member>>();
466 excluded_addresses->insert(*last_target);
467 return get_crdt_operation_target(*excluded_addresses);
470 boost::future<int64_t>
471 PNCounterImpl::invoke_get_internal(std::shared_ptr<std::unordered_set<member>> excluded_addresses,
472 std::exception_ptr last_exception,
473 const boost::shared_ptr<member> &target) {
475 if (last_exception) {
476 std::rethrow_exception(last_exception);
478 BOOST_THROW_EXCEPTION(
479 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::invokeGetInternal",
480 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
484 auto timestamps = observed_clock_.get()->entry_set();
485 auto request = protocol::codec::pncounter_get_encode(get_name(), timestamps, target->get_uuid());
486 return invoke_on_member(request, target->get_uuid()).then(boost::launch::sync,
487 [=](boost::future<protocol::ClientMessage> f) {
489 return get_and_update_timestamps(
491 }
catch (exception::hazelcast_ &e) {
492 return invoke_get_internal(
494 std::current_exception(),
495 try_choose_a_new_target(
500 }
catch (exception::hazelcast_ &e) {
501 return invoke_get_internal(excluded_addresses, std::current_exception(),
502 try_choose_a_new_target(excluded_addresses, target, e));
506 boost::future<int64_t>
507 PNCounterImpl::invoke_add_internal(int64_t delta,
bool getBeforeUpdate,
508 std::shared_ptr<std::unordered_set<member> > excluded_addresses,
509 std::exception_ptr last_exception,
510 const boost::shared_ptr<member> &target) {
512 if (last_exception) {
513 std::rethrow_exception(last_exception);
515 BOOST_THROW_EXCEPTION(
516 exception::no_data_member_in_cluster(
"ClientPNCounterProxy::invokeGetInternal",
517 "Cannot invoke operations on a CRDT because the cluster does not contain any data members"));
522 auto request = protocol::codec::pncounter_add_encode(
523 get_name(), delta, getBeforeUpdate, observed_clock_.get()->entry_set(), target->get_uuid());
524 return invoke_on_member(request, target->get_uuid()).then(boost::launch::sync,
525 [=](boost::future<protocol::ClientMessage> f) {
527 return get_and_update_timestamps(
529 }
catch (exception::hazelcast_ &e) {
530 return invoke_add_internal(delta,
533 std::current_exception(),
534 try_choose_a_new_target(
540 }
catch (exception::hazelcast_ &e) {
541 return invoke_add_internal(delta, getBeforeUpdate, excluded_addresses, std::current_exception(),
542 try_choose_a_new_target(excluded_addresses, target, e));
546 int64_t PNCounterImpl::get_and_update_timestamps(boost::future<protocol::ClientMessage> f) {
548 auto value = msg.get_first_fixed_sized_field<int64_t>();
551 update_observed_replica_timestamps(msg.get<impl::vector_clock::timestamp_vector>());
555 void PNCounterImpl::update_observed_replica_timestamps(
556 const impl::vector_clock::timestamp_vector &received_logical_timestamps) {
557 std::shared_ptr<impl::vector_clock> received = to_vector_clock(received_logical_timestamps);
559 std::shared_ptr<impl::vector_clock> currentClock = this->observed_clock_;
560 if (currentClock->is_after(*received)) {
563 if (observed_clock_.compare_and_set(currentClock, received)) {
569 std::shared_ptr<impl::vector_clock> PNCounterImpl::to_vector_clock(
570 const impl::vector_clock::timestamp_vector &replica_logical_timestamps) {
571 return std::shared_ptr<impl::vector_clock>(
572 new impl::vector_clock(replica_logical_timestamps));
575 boost::shared_ptr<member> PNCounterImpl::get_current_target_replica_address() {
576 return current_target_replica_address_.load();
579 IListImpl::IListImpl(
const std::string &instance_name, spi::ClientContext *context)
580 : ProxyImpl(
"hz:impl:listService", instance_name, context) {
581 serialization::pimpl::data key_data = get_context().get_serialization_service().to_data<std::string>(
583 partition_id_ = get_partition_id(key_data);
586 boost::future<bool> IListImpl::remove_item_listener(boost::uuids::uuid registration_id) {
587 return get_context().get_client_listener_service().deregister_listener(registration_id);
590 boost::future<int> IListImpl::size() {
591 auto request = protocol::codec::list_size_encode(get_name());
592 return invoke_and_get_future<int>(request, partition_id_);
595 boost::future<bool> IListImpl::is_empty() {
596 auto request = protocol::codec::list_isempty_encode(get_name());
597 return invoke_and_get_future<bool>(request, partition_id_);
600 boost::future<bool> IListImpl::contains(
const serialization::pimpl::data &element) {
601 auto request = protocol::codec::list_contains_encode(get_name(), element);
602 return invoke_and_get_future<bool>(request, partition_id_);
605 boost::future<std::vector<serialization::pimpl::data>> IListImpl::to_array_data() {
606 auto request = protocol::codec::list_getall_encode(get_name());
607 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
610 boost::future<bool> IListImpl::add(
const serialization::pimpl::data &element) {
611 auto request = protocol::codec::list_add_encode(get_name(), element);
612 return invoke_and_get_future<bool>(request, partition_id_);
615 boost::future<bool> IListImpl::remove(
const serialization::pimpl::data &element) {
616 auto request = protocol::codec::list_remove_encode(get_name(), element);
617 return invoke_and_get_future<bool>(request, partition_id_);
620 boost::future<bool> IListImpl::contains_all_data(
const std::vector<serialization::pimpl::data> &elements) {
621 auto request = protocol::codec::list_containsall_encode(get_name(), elements);
622 return invoke_and_get_future<bool>(request, partition_id_);
625 boost::future<bool> IListImpl::add_all_data(
const std::vector<serialization::pimpl::data> &elements) {
626 auto request = protocol::codec::list_addall_encode(get_name(), elements);
627 return invoke_and_get_future<bool>(request, partition_id_);
630 boost::future<bool> IListImpl::add_all_data(
int index,
const std::vector<serialization::pimpl::data> &elements) {
631 auto request = protocol::codec::list_addallwithindex_encode(get_name(), index,
633 return invoke_and_get_future<bool>(request, partition_id_);
636 boost::future<bool> IListImpl::remove_all_data(
const std::vector<serialization::pimpl::data> &elements) {
637 auto request = protocol::codec::list_compareandremoveall_encode(get_name(), elements);
638 return invoke_and_get_future<bool>(request, partition_id_);
641 boost::future<bool> IListImpl::retain_all_data(
const std::vector<serialization::pimpl::data> &elements) {
642 auto request = protocol::codec::list_compareandretainall_encode(get_name(), elements);
643 return invoke_and_get_future<bool>(request, partition_id_);
646 boost::future<void> IListImpl::clear() {
647 auto request = protocol::codec::list_clear_encode(get_name());
648 return to_void_future(invoke_on_partition(request, partition_id_));
651 boost::future<boost::optional<serialization::pimpl::data>> IListImpl::get_data(
int index) {
652 auto request = protocol::codec::list_get_encode(get_name(), index);
653 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
656 boost::future<boost::optional<serialization::pimpl::data>> IListImpl::set_data(
int index,
657 const serialization::pimpl::data &element) {
658 auto request = protocol::codec::list_set_encode(get_name(), index, element);
659 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
662 boost::future<void> IListImpl::add(
int index,
const serialization::pimpl::data &element) {
663 auto request = protocol::codec::list_addwithindex_encode(get_name(), index, element);
664 return to_void_future(invoke_on_partition(request, partition_id_));
667 boost::future<boost::optional<serialization::pimpl::data>> IListImpl::remove_data(
int index) {
668 auto request = protocol::codec::list_removewithindex_encode(get_name(), index);
669 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
672 boost::future<int> IListImpl::index_of(
const serialization::pimpl::data &element) {
673 auto request = protocol::codec::list_indexof_encode(get_name(), element);
674 return invoke_and_get_future<int>(request, partition_id_);
677 boost::future<int> IListImpl::last_index_of(
const serialization::pimpl::data &element) {
678 auto request = protocol::codec::list_lastindexof_encode(get_name(), element);
679 return invoke_and_get_future<int>(request, partition_id_);
682 boost::future<std::vector<serialization::pimpl::data>> IListImpl::sub_list_data(
int from_index,
int to_index) {
683 auto request = protocol::codec::list_sub_encode(get_name(), from_index, to_index);
684 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
687 std::shared_ptr<spi::impl::ListenerMessageCodec> IListImpl::create_item_listener_codec(
bool include_value) {
688 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
689 new ListListenerMessageCodec(get_name(), include_value));
692 IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(std::string name,
693 bool include_value) : name_(std::move(name)),
697 protocol::ClientMessage
698 IListImpl::ListListenerMessageCodec::encode_add_request(
bool local_only)
const {
699 return protocol::codec::list_addlistener_encode(name_, include_value_, local_only);
702 protocol::ClientMessage
703 IListImpl::ListListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
704 return protocol::codec::list_removelistener_encode(name_, real_registration_id);
707 flake_id_generator_impl::Block::Block(IdBatch &&id_batch, std::chrono::milliseconds validity)
708 : id_batch_(id_batch), invalid_since_(std::chrono::steady_clock::now() + validity), num_returned_(0) {}
710 int64_t flake_id_generator_impl::Block::next() {
711 if (invalid_since_ <= std::chrono::steady_clock::now()) {
716 index = num_returned_;
717 if (index == id_batch_.get_batch_size()) {
720 }
while (!num_returned_.compare_exchange_strong(index, index + 1));
722 return id_batch_.get_base() + index * id_batch_.get_increment();
725 flake_id_generator_impl::IdBatch::IdIterator flake_id_generator_impl::IdBatch::endOfBatch;
727 const int64_t flake_id_generator_impl::IdBatch::get_base()
const {
731 const int64_t flake_id_generator_impl::IdBatch::get_increment()
const {
735 const int32_t flake_id_generator_impl::IdBatch::get_batch_size()
const {
739 flake_id_generator_impl::IdBatch::IdBatch(int64_t base, int64_t increment, int32_t batch_size)
740 : base_(base), increment_(increment), batch_size_(batch_size) {}
742 flake_id_generator_impl::IdBatch::IdIterator &flake_id_generator_impl::IdBatch::end() {
746 flake_id_generator_impl::IdBatch::IdIterator flake_id_generator_impl::IdBatch::iterator() {
747 return flake_id_generator_impl::IdBatch::IdIterator(base_, increment_, batch_size_);
750 flake_id_generator_impl::IdBatch::IdIterator::IdIterator(int64_t base2,
const int64_t increment, int32_t remaining) : base2_(
751 base2), increment_(increment), remaining_(remaining) {}
753 bool flake_id_generator_impl::IdBatch::IdIterator::operator==(
const flake_id_generator_impl::IdBatch::IdIterator &rhs)
const {
754 return base2_ == rhs.base2_ && increment_ == rhs.increment_ && remaining_ == rhs.remaining_;
757 bool flake_id_generator_impl::IdBatch::IdIterator::operator!=(
const flake_id_generator_impl::IdBatch::IdIterator &rhs)
const {
758 return !(rhs == *
this);
761 flake_id_generator_impl::IdBatch::IdIterator::IdIterator() : base2_(-1), increment_(-1), remaining_(-1) {
764 flake_id_generator_impl::IdBatch::IdIterator &flake_id_generator_impl::IdBatch::IdIterator::operator++() {
765 if (remaining_ == 0) {
766 return flake_id_generator_impl::IdBatch::end();
771 base2_ += increment_;
777 flake_id_generator_impl::flake_id_generator_impl(
const std::string &service_name,
const std::string &object_name,
778 spi::ClientContext *context)
779 : ProxyImpl(service_name, object_name, context), block_(nullptr) {
780 auto config = context->get_client_config().find_flake_id_generator_config(object_name);
781 batch_size_ = config->get_prefetch_count();
782 validity_ = config->get_prefetch_validity_duration();
785 int64_t flake_id_generator_impl::new_id_internal() {
786 auto b = block_.load();
788 int64_t res = b->next();
789 if (res != INT64_MIN) {
794 throw std::overflow_error(
"");
797 boost::future<int64_t> flake_id_generator_impl::new_id() {
799 return boost::make_ready_future(new_id_internal());
800 }
catch (std::overflow_error &) {
801 return new_id_batch(batch_size_).then(boost::launch::sync,
802 [=](boost::future<flake_id_generator_impl::IdBatch> f) {
803 auto newBlock = boost::make_shared<Block>(f.get(),
805 auto value = newBlock->next();
806 auto b = block_.load();
807 block_.compare_exchange_strong(b, newBlock);
813 boost::future<flake_id_generator_impl::IdBatch> flake_id_generator_impl::new_id_batch(int32_t size) {
814 auto request = protocol::codec::flakeidgenerator_newidbatch_encode(
816 return invoke(request).then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
818 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
820 auto base = msg.get<int64_t>();
821 auto increment = msg.get<int64_t>();
822 auto batch_size = msg.get<int32_t>();
823 return flake_id_generator_impl::IdBatch(base, increment, batch_size);
827 IQueueImpl::IQueueImpl(
const std::string &instance_name, spi::ClientContext *context)
828 : ProxyImpl(
"hz:impl:queueService", instance_name, context) {
829 serialization::pimpl::data data = get_context().get_serialization_service().to_data<std::string>(
831 partition_id_ = get_partition_id(data);
834 boost::future<bool> IQueueImpl::remove_item_listener(
835 boost::uuids::uuid registration_id) {
836 return get_context().get_client_listener_service().deregister_listener(registration_id);
839 boost::future<bool> IQueueImpl::offer(
const serialization::pimpl::data &element, std::chrono::milliseconds timeout) {
840 auto request = protocol::codec::queue_offer_encode(get_name(), element,
841 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
842 return invoke_and_get_future<bool>(request, partition_id_);
845 boost::future<void> IQueueImpl::put(
const serialization::pimpl::data &element) {
846 auto request = protocol::codec::queue_put_encode(get_name(), element);
847 return to_void_future(invoke_on_partition(request, partition_id_));
850 boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::poll_data(std::chrono::milliseconds timeout) {
851 auto request = protocol::codec::queue_poll_encode(get_name(), std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
852 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
855 boost::future<int> IQueueImpl::remaining_capacity() {
856 auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
857 return invoke_and_get_future<int>(request, partition_id_);
860 boost::future<bool> IQueueImpl::remove(
const serialization::pimpl::data &element) {
861 auto request = protocol::codec::queue_remove_encode(get_name(), element);
862 return invoke_and_get_future<bool>(request, partition_id_);
865 boost::future<bool> IQueueImpl::contains(
const serialization::pimpl::data &element) {
866 auto request = protocol::codec::queue_contains_encode(get_name(), element);
867 return invoke_and_get_future<bool>(request, partition_id_);
870 boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::drain_to_data(
size_t max_elements) {
871 auto request = protocol::codec::queue_draintomaxsize_encode(get_name(), (int32_t) max_elements);
873 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
876 boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::drain_to_data() {
877 auto request = protocol::codec::queue_drainto_encode(get_name());
878 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
881 boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::take_data() {
882 auto request = protocol::codec::queue_take_encode(get_name());
883 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
886 boost::future<boost::optional<serialization::pimpl::data>> IQueueImpl::peek_data() {
887 auto request = protocol::codec::queue_peek_encode(get_name());
888 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, partition_id_);
891 boost::future<int> IQueueImpl::size() {
892 auto request = protocol::codec::queue_size_encode(get_name());
893 return invoke_and_get_future<int>(request, partition_id_);
896 boost::future<bool> IQueueImpl::is_empty() {
897 auto request = protocol::codec::queue_isempty_encode(get_name());
898 return invoke_and_get_future<bool>(request, partition_id_);
901 boost::future<std::vector<serialization::pimpl::data>> IQueueImpl::to_array_data() {
902 auto request = protocol::codec::queue_iterator_encode(get_name());
903 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
906 boost::future<bool> IQueueImpl::contains_all_data(
const std::vector<serialization::pimpl::data> &elements) {
907 auto request = protocol::codec::queue_containsall_encode(get_name(), elements);
908 return invoke_and_get_future<bool>(request, partition_id_);
911 boost::future<bool> IQueueImpl::add_all_data(
const std::vector<serialization::pimpl::data> &elements) {
912 auto request = protocol::codec::queue_addall_encode(get_name(), elements);
913 return invoke_and_get_future<bool>(request, partition_id_);
916 boost::future<bool> IQueueImpl::remove_all_data(
const std::vector<serialization::pimpl::data> &elements) {
917 auto request = protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
918 return invoke_and_get_future<bool>(request, partition_id_);
921 boost::future<bool> IQueueImpl::retain_all_data(
const std::vector<serialization::pimpl::data> &elements) {
922 auto request = protocol::codec::queue_compareandretainall_encode(get_name(), elements);
923 return invoke_and_get_future<bool>(request, partition_id_);
926 boost::future<void> IQueueImpl::clear() {
927 auto request = protocol::codec::queue_clear_encode(get_name());
928 return to_void_future(invoke_on_partition(request, partition_id_));
931 std::shared_ptr<spi::impl::ListenerMessageCodec>
932 IQueueImpl::create_item_listener_codec(
bool include_value) {
933 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
934 new QueueListenerMessageCodec(get_name(), include_value));
937 IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(std::string name,
938 bool include_value) : name_(std::move(name)),
942 protocol::ClientMessage
943 IQueueImpl::QueueListenerMessageCodec::encode_add_request(
bool local_only)
const {
944 return protocol::codec::queue_addlistener_encode(name_, include_value_, local_only);
947 protocol::ClientMessage
948 IQueueImpl::QueueListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
949 return protocol::codec::queue_removelistener_encode(name_, real_registration_id);
952 ProxyImpl::ProxyImpl(
const std::string &service_name,
const std::string &object_name,
953 spi::ClientContext *context)
954 : ClientProxy(object_name, service_name, *context), SerializingProxy(*context, object_name) {}
956 ProxyImpl::~ProxyImpl() =
default;
958 SerializingProxy::SerializingProxy(spi::ClientContext &context,
const std::string &object_name)
959 : serialization_service_(context.get_serialization_service()),
960 partition_service_(context.get_partition_service()), object_name_(object_name), client_context_(context) {}
962 int SerializingProxy::get_partition_id(
const serialization::pimpl::data &key) {
963 return partition_service_.get_partition_id(key);
966 boost::future<protocol::ClientMessage> SerializingProxy::invoke_on_partition(
967 protocol::ClientMessage &request,
int partition_id) {
969 return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, partition_id)->invoke();
970 }
catch (exception::iexception &) {
971 util::exception_util::rethrow(std::current_exception());
972 return boost::make_ready_future(protocol::ClientMessage(0));
976 boost::future<protocol::ClientMessage> SerializingProxy::invoke(protocol::ClientMessage &request) {
978 return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_)->invoke();
979 }
catch (exception::iexception &) {
980 util::exception_util::rethrow(std::current_exception());
981 return boost::make_ready_future(protocol::ClientMessage(0));
985 boost::future<protocol::ClientMessage> SerializingProxy::invoke_on_connection(protocol::ClientMessage &request,
986 std::shared_ptr<connection::Connection> connection) {
988 return spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, connection)->invoke();
989 }
catch (exception::iexception &) {
990 util::exception_util::rethrow(std::current_exception());
991 return boost::make_ready_future(protocol::ClientMessage(0));
995 boost::future<protocol::ClientMessage>
996 SerializingProxy::invoke_on_key_owner(protocol::ClientMessage &request,
997 const serialization::pimpl::data &key_data) {
999 return invoke_on_partition(request, get_partition_id(key_data));
1000 }
catch (exception::iexception &) {
1001 util::exception_util::rethrow(std::current_exception());
1002 return boost::make_ready_future(protocol::ClientMessage(0));
1006 boost::future<protocol::ClientMessage>
1007 SerializingProxy::invoke_on_member(protocol::ClientMessage &request, boost::uuids::uuid uuid) {
1009 auto invocation = spi::impl::ClientInvocation::create(client_context_, std::make_shared<protocol::ClientMessage>(std::move(request)), object_name_, uuid);
1010 return invocation->invoke();
1011 }
catch (exception::iexception &) {
1012 util::exception_util::rethrow(std::current_exception());
1013 return boost::make_ready_future(protocol::ClientMessage(0));
1018 boost::future<boost::optional<serialization::pimpl::data>>
1019 SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request) {
1020 return decode_optional_var_sized<serialization::pimpl::data>(invoke(request));
1024 boost::future<boost::optional<map::data_entry_view>>
1025 SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request,
const serialization::pimpl::data &key) {
1026 return decode_optional_var_sized<map::data_entry_view>(invoke_on_key_owner(request, key));
1030 boost::future<boost::optional<serialization::pimpl::data>>
1031 SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request,
int partition_id) {
1032 return decode_optional_var_sized<serialization::pimpl::data>(invoke_on_partition(request, partition_id));
1036 boost::future<boost::optional<serialization::pimpl::data>>
1037 SerializingProxy::invoke_and_get_future(protocol::ClientMessage &request,
const serialization::pimpl::data &key) {
1038 return decode_optional_var_sized<serialization::pimpl::data>(invoke_on_key_owner(request, key));
1041 PartitionSpecificClientProxy::PartitionSpecificClientProxy(
const std::string &service_name,
1042 const std::string &object_name,
1043 spi::ClientContext *context) : ProxyImpl(
1044 service_name, object_name, context), partition_id_(-1) {}
1046 void PartitionSpecificClientProxy::on_initialize() {
1047 std::string partitionKey = internal::partition::strategy::StringPartitioningStrategy::get_partition_key(
1049 partition_id_ = get_context().get_partition_service().get_partition_id(to_data<std::string>(partitionKey));
1052 IMapImpl::IMapImpl(
const std::string &instance_name, spi::ClientContext *context)
1053 : ProxyImpl(
"hz:impl:mapService", instance_name, context) {}
1055 boost::future<bool> IMapImpl::contains_key(
const serialization::pimpl::data &key) {
1056 auto request = protocol::codec::map_containskey_encode(get_name(), key, util::get_current_thread_id());
1057 return invoke_and_get_future<bool>(request, key);
1060 boost::future<bool> IMapImpl::contains_value(
const serialization::pimpl::data &value) {
1061 auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1062 return invoke_and_get_future<bool>(request);
1065 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::get_data(
const serialization::pimpl::data &key) {
1066 auto request = protocol::codec::map_get_encode(get_name(), key, util::get_current_thread_id());
1067 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1070 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::remove_data(
const serialization::pimpl::data &key) {
1071 auto request = protocol::codec::map_remove_encode(get_name(), key, util::get_current_thread_id());
1072 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1075 boost::future<bool> IMapImpl::remove(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
1076 auto request = protocol::codec::map_removeifsame_encode(get_name(), key, value,
1077 util::get_current_thread_id());
1078 return invoke_and_get_future<bool>(request, key);
1081 boost::future<protocol::ClientMessage> IMapImpl::remove_all(
const serialization::pimpl::data &predicate_data) {
1082 auto request = protocol::codec::map_removeall_encode(get_name(), predicate_data);
1083 return invoke(request);
1086 boost::future<protocol::ClientMessage> IMapImpl::delete_entry(
const serialization::pimpl::data &key) {
1087 auto request = protocol::codec::map_delete_encode(get_name(), key, util::get_current_thread_id());
1088 return invoke_on_partition(request, get_partition_id(key));
1091 boost::future<protocol::ClientMessage> IMapImpl::flush() {
1092 auto request = protocol::codec::map_flush_encode(get_name());
1093 return invoke(request);
1096 boost::future<bool> IMapImpl::try_remove(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
1097 auto request = protocol::codec::map_tryremove_encode(get_name(), key,
1098 util::get_current_thread_id(),
1099 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1101 return invoke_and_get_future<bool>(request, key);
1104 boost::future<bool> IMapImpl::try_put(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value,
1105 std::chrono::milliseconds timeout) {
1106 auto request = protocol::codec::map_tryput_encode(get_name(), key, value,
1107 util::get_current_thread_id(),
1108 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1110 return invoke_and_get_future<bool>(request, key);
1113 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::put_data(
const serialization::pimpl::data &key,
1114 const serialization::pimpl::data &value,
1115 std::chrono::milliseconds ttl) {
1116 auto request = protocol::codec::map_put_encode(get_name(), key, value,
1117 util::get_current_thread_id(),
1118 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1119 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1122 boost::future<protocol::ClientMessage> IMapImpl::put_transient(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value,
1123 std::chrono::milliseconds ttl) {
1124 auto request = protocol::codec::map_puttransient_encode(get_name(), key, value,
1125 util::get_current_thread_id(),
1126 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1127 return invoke_on_partition(request, get_partition_id(key));
1130 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::put_if_absent_data(
const serialization::pimpl::data &key,
1131 const serialization::pimpl::data &value,
1132 std::chrono::milliseconds ttl) {
1133 auto request = protocol::codec::map_putifabsent_encode(get_name(), key, value,
1134 util::get_current_thread_id(),
1135 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1136 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1139 boost::future<bool> IMapImpl::replace(
const serialization::pimpl::data &key,
const serialization::pimpl::data &old_value,
1140 const serialization::pimpl::data &new_value) {
1141 auto request = protocol::codec::map_replaceifsame_encode(get_name(), key, old_value,
1143 util::get_current_thread_id());
1145 return invoke_and_get_future<bool>(request, key);
1148 boost::future<boost::optional<serialization::pimpl::data>> IMapImpl::replace_data(
const serialization::pimpl::data &key,
1149 const serialization::pimpl::data &value) {
1150 auto request = protocol::codec::map_replace_encode(get_name(), key, value,
1151 util::get_current_thread_id());
1153 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, key);
1156 boost::future<protocol::ClientMessage>
1157 IMapImpl::set(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value,
1158 std::chrono::milliseconds ttl) {
1159 auto request = protocol::codec::map_set_encode(get_name(), key, value,
1160 util::get_current_thread_id(),
1161 std::chrono::duration_cast<std::chrono::milliseconds>(
1163 return invoke_on_partition(request, get_partition_id(key));
1166 boost::future<protocol::ClientMessage> IMapImpl::lock(
const serialization::pimpl::data &key) {
1167 return lock(key, std::chrono::milliseconds(-1));
1170 boost::future<protocol::ClientMessage> IMapImpl::lock(
const serialization::pimpl::data &key, std::chrono::milliseconds lease_time) {
1171 auto request = protocol::codec::map_lock_encode(get_name(), key, util::get_current_thread_id(),
1172 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1173 lock_reference_id_generator_->get_next_reference_id());
1174 return invoke_on_partition(request, get_partition_id(key));
1177 boost::future<bool> IMapImpl::is_locked(
const serialization::pimpl::data &key) {
1178 auto request = protocol::codec::map_islocked_encode(get_name(), key);
1180 return invoke_and_get_future<bool>(request, key);
1183 boost::future<bool> IMapImpl::try_lock(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout) {
1184 return try_lock(key, timeout, std::chrono::milliseconds(-1));
1188 IMapImpl::try_lock(
const serialization::pimpl::data &key, std::chrono::milliseconds timeout,
1189 std::chrono::milliseconds lease_time) {
1190 auto request = protocol::codec::map_trylock_encode(get_name(), key, util::get_current_thread_id(),
1191 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1192 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1193 lock_reference_id_generator_->get_next_reference_id());
1194 return invoke_and_get_future<bool>(request, key);
1197 boost::future<protocol::ClientMessage> IMapImpl::unlock(
const serialization::pimpl::data &key) {
1198 auto request = protocol::codec::map_unlock_encode(get_name(), key, util::get_current_thread_id(),
1199 lock_reference_id_generator_->get_next_reference_id());
1200 return invoke_on_partition(request, get_partition_id(key));
1203 boost::future<protocol::ClientMessage> IMapImpl::force_unlock(
const serialization::pimpl::data &key) {
1204 auto request = protocol::codec::map_forceunlock_encode(get_name(), key,
1205 lock_reference_id_generator_->get_next_reference_id());
1206 return invoke_on_partition(request, get_partition_id(key));
1209 boost::future<std::string> IMapImpl::add_interceptor(
const serialization::pimpl::data &interceptor) {
1210 auto request = protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1211 return invoke_and_get_future<std::string>(request);
1215 boost::future<boost::uuids::uuid> IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
bool include_value, int32_t listener_flags) {
1216 return register_listener(create_map_entry_listener_codec(include_value, listener_flags), std::move(entry_event_handler));
1219 boost::future<boost::uuids::uuid>
1220 IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1221 serialization::pimpl::data &&predicate,
bool include_value,
1222 int32_t listener_flags) {
1223 return register_listener(
1224 create_map_entry_listener_codec(include_value, std::move(predicate), listener_flags),
1225 std::move(entry_event_handler));
1228 boost::future<bool> IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id) {
1229 return get_context().get_client_listener_service().deregister_listener(registration_id);
1232 boost::future<boost::uuids::uuid>
1233 IMapImpl::add_entry_listener(std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1234 bool include_value, serialization::pimpl::data &&key, int32_t listener_flags) {
1235 return register_listener(create_map_entry_listener_codec(include_value, listener_flags, std::move(key)),
1236 std::move(entry_event_handler));
1239 boost::future<boost::optional<map::data_entry_view>> IMapImpl::get_entry_view_data(
const serialization::pimpl::data &key) {
1240 auto request = protocol::codec::map_getentryview_encode(get_name(), key,
1241 util::get_current_thread_id());
1242 return invoke_and_get_future<boost::optional<map::data_entry_view>>(request, key);
1245 boost::future<bool> IMapImpl::evict(
const serialization::pimpl::data &key) {
1246 auto request = protocol::codec::map_evict_encode(get_name(), key, util::get_current_thread_id());
1247 return invoke_and_get_future<bool>(request, key);
1250 boost::future<protocol::ClientMessage> IMapImpl::evict_all() {
1251 auto request = protocol::codec::map_evictall_encode(get_name());
1252 return invoke(request);
1255 boost::future<EntryVector>
1256 IMapImpl::get_all_data(
int partition_id,
const std::vector<serialization::pimpl::data> &keys) {
1257 auto request = protocol::codec::map_getall_encode(get_name(), keys);
1258 return invoke_and_get_future<EntryVector>(request, partition_id);
1261 boost::future<std::vector<serialization::pimpl::data>> IMapImpl::key_set_data() {
1262 auto request = protocol::codec::map_keyset_encode(get_name());
1263 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1266 boost::future<std::vector<serialization::pimpl::data>> IMapImpl::key_set_data(
const serialization::pimpl::data &predicate) {
1267 auto request = protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1268 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1271 boost::future<std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>> IMapImpl::key_set_for_paging_predicate_data(
1272 protocol::codec::holder::paging_predicate_holder
const & predicate) {
1273 auto request = protocol::codec::map_keysetwithpagingpredicate_encode(get_name(), predicate);
1274 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1275 return get_paging_predicate_response<std::vector<serialization::pimpl::data>>(std::move(f));
1279 boost::future<EntryVector> IMapImpl::entry_set_data() {
1280 auto request = protocol::codec::map_entryset_encode(get_name());
1281 return invoke_and_get_future<EntryVector>(request);
1284 boost::future<EntryVector> IMapImpl::entry_set_data(
const serialization::pimpl::data &predicate) {
1285 auto request = protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1286 return invoke_and_get_future<EntryVector>(request);
1289 boost::future<std::pair<EntryVector, query::anchor_data_list>> IMapImpl::entry_set_for_paging_predicate_data(
1290 protocol::codec::holder::paging_predicate_holder
const & predicate) {
1291 auto request = protocol::codec::map_entrieswithpagingpredicate_encode(get_name(), predicate);
1292 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1293 return get_paging_predicate_response<EntryVector>(std::move(f));
1297 boost::future<std::vector<serialization::pimpl::data>> IMapImpl::values_data() {
1298 auto request = protocol::codec::map_values_encode(get_name());
1299 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1302 boost::future<std::vector<serialization::pimpl::data>> IMapImpl::values_data(
const serialization::pimpl::data &predicate) {
1303 auto request = protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1304 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request);
1307 boost::future<std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1308 IMapImpl::values_for_paging_predicate_data(protocol::codec::holder::paging_predicate_holder
const & predicate) {
1309 auto request = protocol::codec::map_valueswithpagingpredicate_encode(get_name(), predicate);
1310 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1311 return get_paging_predicate_response<std::vector<serialization::pimpl::data>>(std::move(f));
1315 boost::future<protocol::ClientMessage> IMapImpl::add_index_data(
const config::index_config &config) {
1316 auto request = protocol::codec::map_addindex_encode(get_name(), config);
1317 return invoke(request);
1320 boost::future<int> IMapImpl::size() {
1321 auto request = protocol::codec::map_size_encode(get_name());
1322 return invoke_and_get_future<int>(request);
1325 boost::future<bool> IMapImpl::is_empty() {
1326 auto request = protocol::codec::map_isempty_encode(get_name());
1327 return invoke_and_get_future<bool>(request);
1330 boost::future<protocol::ClientMessage> IMapImpl::put_all_data(
int partition_id,
const EntryVector &entries) {
1331 auto request = protocol::codec::map_putall_encode(get_name(), entries,
true);
1332 return invoke_on_partition(request, partition_id);
1335 boost::future<protocol::ClientMessage> IMapImpl::clear_data() {
1336 auto request = protocol::codec::map_clear_encode(get_name());
1337 return invoke(request);
1340 boost::future<boost::optional<serialization::pimpl::data>>
1341 IMapImpl::execute_on_key_data(
const serialization::pimpl::data &key,
1342 const serialization::pimpl::data &processor) {
1343 auto request = protocol::codec::map_executeonkey_encode(get_name(),
1346 util::get_current_thread_id());
1347 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request, get_partition_id(key));
1350 boost::future<boost::optional<serialization::pimpl::data>>
1351 IMapImpl::submit_to_key_data(
const serialization::pimpl::data &key,
1352 const serialization::pimpl::data &processor) {
1353 auto request = protocol::codec::map_submittokey_encode(get_name(),
1356 util::get_current_thread_id());
1357 return invoke_on_partition(request, get_partition_id(key)).then(boost::launch::sync,
1358 [](boost::future<protocol::ClientMessage> f) {
1361 return msg.get_nullable<serialization::pimpl::data>();
1365 boost::future<EntryVector> IMapImpl::execute_on_keys_data(
const std::vector<serialization::pimpl::data> &keys,
1366 const serialization::pimpl::data &processor) {
1367 auto request = protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
1368 return invoke_and_get_future<EntryVector>(request);
1371 boost::future<protocol::ClientMessage> IMapImpl::remove_interceptor(
const std::string &
id) {
1372 auto request = protocol::codec::map_removeinterceptor_encode(get_name(),
id);
1373 return invoke(request);
1376 boost::future<EntryVector> IMapImpl::execute_on_entries_data(
const serialization::pimpl::data &entry_processor) {
1377 auto request = protocol::codec::map_executeonallkeys_encode(
1378 get_name(), entry_processor);
1379 return invoke_and_get_future<EntryVector>(request);
1383 boost::future<EntryVector>
1384 IMapImpl::execute_on_entries_data(
const serialization::pimpl::data &entry_processor,
const serialization::pimpl::data &predicate) {
1385 auto request = protocol::codec::map_executewithpredicate_encode(get_name(),
1388 return invoke_and_get_future<EntryVector>(request);
1392 std::shared_ptr<spi::impl::ListenerMessageCodec>
1393 IMapImpl::create_map_entry_listener_codec(
bool include_value, serialization::pimpl::data &&predicate,
1394 int32_t listener_flags) {
1395 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1396 new MapEntryListenerWithPredicateMessageCodec(get_name(), include_value, listener_flags, std::move(predicate)));
1399 std::shared_ptr<spi::impl::ListenerMessageCodec>
1400 IMapImpl::create_map_entry_listener_codec(
bool include_value, int32_t listener_flags) {
1401 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1402 new MapEntryListenerMessageCodec(get_name(), include_value, listener_flags));
1405 std::shared_ptr<spi::impl::ListenerMessageCodec>
1406 IMapImpl::create_map_entry_listener_codec(
bool include_value, int32_t listener_flags,
1407 serialization::pimpl::data &&key) {
1408 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1409 new MapEntryListenerToKeyCodec(get_name(), include_value, listener_flags, std::move(key)));
1412 void IMapImpl::on_initialize() {
1413 ProxyImpl::on_initialize();
1414 lock_reference_id_generator_ = get_context().get_lock_reference_id_generator();
1417 IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(std::string name,
1419 int32_t listener_flags) : name_(std::move(name)),
1425 protocol::ClientMessage
1426 IMapImpl::MapEntryListenerMessageCodec::encode_add_request(
bool local_only)
const {
1427 return protocol::codec::map_addentrylistener_encode(name_, include_value_,
1428 static_cast<int32_t
>(listener_flags_),
1432 protocol::ClientMessage
1433 IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
1434 return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1437 protocol::ClientMessage
1438 IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(
bool local_only)
const {
1439 return protocol::codec::map_addentrylistenertokey_encode(name_, key_, include_value_,
1440 static_cast<int32_t
>(listener_flags_), local_only);
1443 protocol::ClientMessage
1444 IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
1445 return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1448 IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(std::string name,
bool include_value,
1449 int32_t listener_flags,
1450 serialization::pimpl::data key)
1451 : name_(std::move(name)), include_value_(include_value), listener_flags_(listener_flags), key_(std::move(key)) {}
1453 IMapImpl::MapEntryListenerWithPredicateMessageCodec::MapEntryListenerWithPredicateMessageCodec(
1454 std::string name,
bool include_value, int32_t listener_flags,
1455 serialization::pimpl::data &&predicate) : name_(std::move(name)), include_value_(include_value),
1456 listener_flags_(listener_flags), predicate_(std::move(predicate)) {}
1458 protocol::ClientMessage
1459 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(
bool local_only)
const {
1460 return protocol::codec::map_addentrylistenerwithpredicate_encode(name_, predicate_,
1462 static_cast<int32_t
>(listener_flags_), local_only);
1465 protocol::ClientMessage
1466 IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
1467 boost::uuids::uuid real_registration_id)
const {
1468 return protocol::codec::map_removeentrylistener_encode(name_, real_registration_id);
1471 TransactionalQueueImpl::TransactionalQueueImpl(
const std::string &name,
1472 txn::TransactionProxy &transaction_proxy)
1473 : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy) {}
1475 boost::future<bool> TransactionalQueueImpl::offer(
const serialization::pimpl::data &e, std::chrono::milliseconds timeout) {
1476 auto request = protocol::codec::transactionalqueue_offer_encode(
1477 get_name(), get_transaction_id(), util::get_current_thread_id(), e, std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1479 return invoke_and_get_future<bool>(request);
1482 boost::future<boost::optional<serialization::pimpl::data>> TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout) {
1483 auto request = protocol::codec::transactionalqueue_poll_encode(
1484 get_name(), get_transaction_id(), util::get_current_thread_id(), std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1486 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(request);
1489 boost::future<int> TransactionalQueueImpl::size() {
1490 auto request = protocol::codec::transactionalqueue_size_encode(
1491 get_name(), get_transaction_id(), util::get_current_thread_id());
1493 return invoke_and_get_future<int>(request);
1496 ISetImpl::ISetImpl(
const std::string &instance_name, spi::ClientContext *client_context)
1497 : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context) {
1498 serialization::pimpl::data key_data = get_context().get_serialization_service().to_data<std::string>(
1500 partition_id_ = get_partition_id(key_data);
1503 boost::future<bool> ISetImpl::remove_item_listener(boost::uuids::uuid registration_id) {
1504 return get_context().get_client_listener_service().deregister_listener(registration_id);
1507 boost::future<int> ISetImpl::size() {
1508 auto request = protocol::codec::set_size_encode(get_name());
1509 return invoke_and_get_future<int>(request, partition_id_);
1512 boost::future<bool> ISetImpl::is_empty() {
1513 auto request = protocol::codec::set_isempty_encode(get_name());
1514 return invoke_and_get_future<bool>(request, partition_id_);
1517 boost::future<bool> ISetImpl::contains(
const serialization::pimpl::data &element) {
1518 auto request = protocol::codec::set_contains_encode(get_name(), element);
1519 return invoke_and_get_future<bool>(request, partition_id_);
1522 boost::future<std::vector<serialization::pimpl::data>> ISetImpl::to_array_data() {
1523 auto request = protocol::codec::set_getall_encode(get_name());
1524 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(request, partition_id_);
1527 boost::future<bool> ISetImpl::add(
const serialization::pimpl::data &element) {
1528 auto request = protocol::codec::set_add_encode(get_name(), element);
1529 return invoke_and_get_future<bool>(request, partition_id_);
1532 boost::future<bool> ISetImpl::remove(
const serialization::pimpl::data &element) {
1533 auto request = protocol::codec::set_remove_encode(get_name(), element);
1534 return invoke_and_get_future<bool>(request, partition_id_);
1537 boost::future<bool> ISetImpl::contains_all(
const std::vector<serialization::pimpl::data> &elements) {
1538 auto request = protocol::codec::set_containsall_encode(get_name(), elements);
1539 return invoke_and_get_future<bool>(request, partition_id_);
1542 boost::future<bool> ISetImpl::add_all(
const std::vector<serialization::pimpl::data> &elements) {
1543 auto request = protocol::codec::set_addall_encode(get_name(), elements);
1544 return invoke_and_get_future<bool>(request, partition_id_);
1547 boost::future<bool> ISetImpl::remove_all(
const std::vector<serialization::pimpl::data> &elements) {
1548 auto request = protocol::codec::set_compareandremoveall_encode(get_name(), elements);
1549 return invoke_and_get_future<bool>(request, partition_id_);
1552 boost::future<bool> ISetImpl::retain_all(
const std::vector<serialization::pimpl::data> &elements) {
1553 auto request = protocol::codec::set_compareandretainall_encode(get_name(), elements);
1554 return invoke_and_get_future<bool>(request, partition_id_);
1557 boost::future<void> ISetImpl::clear() {
1558 auto request = protocol::codec::set_clear_encode(get_name());
1559 return to_void_future(invoke_on_partition(request, partition_id_));
1562 std::shared_ptr<spi::impl::ListenerMessageCodec>
1563 ISetImpl::create_item_listener_codec(
bool include_value) {
1564 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1565 new SetListenerMessageCodec(get_name(), include_value));
1568 ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name,
bool include_value)
1569 : name_(std::move(name)), include_value_(include_value) {}
1571 protocol::ClientMessage
1572 ISetImpl::SetListenerMessageCodec::encode_add_request(
bool local_only)
const {
1573 return protocol::codec::set_addlistener_encode(name_, include_value_, local_only);
1576 protocol::ClientMessage
1577 ISetImpl::SetListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
1578 return protocol::codec::set_removelistener_encode(name_, real_registration_id);
1581 ITopicImpl::ITopicImpl(
const std::string &instance_name, spi::ClientContext *context)
1582 : proxy::ProxyImpl(
"hz:impl:topicService", instance_name, context),
1583 partition_id_(get_partition_id(to_data(instance_name))) {}
1585 boost::future<void> ITopicImpl::publish(
const serialization::pimpl::data &data) {
1586 auto request = protocol::codec::topic_publish_encode(get_name(), data);
1587 return to_void_future(invoke_on_partition(request, partition_id_));
1590 boost::future<boost::uuids::uuid> ITopicImpl::add_message_listener(std::shared_ptr<impl::BaseEventHandler> topic_event_handler) {
1591 return register_listener(create_item_listener_codec(), std::move(topic_event_handler));
1594 boost::future<bool> ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id) {
1595 return get_context().get_client_listener_service().deregister_listener(registration_id);
1598 std::shared_ptr<spi::impl::ListenerMessageCodec> ITopicImpl::create_item_listener_codec() {
1599 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
new TopicListenerMessageCodec(get_name()));
1602 ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(std::string name) : name_(std::move(name)) {}
1604 protocol::ClientMessage
1605 ITopicImpl::TopicListenerMessageCodec::encode_add_request(
bool local_only)
const {
1606 return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
1609 protocol::ClientMessage
1610 ITopicImpl::TopicListenerMessageCodec::encode_remove_request(boost::uuids::uuid real_registration_id)
const {
1611 return protocol::codec::topic_removemessagelistener_encode(name_, real_registration_id);
1614 ReplicatedMapImpl::ReplicatedMapImpl(
const std::string &service_name,
const std::string &object_name,
1615 spi::ClientContext *context) : ProxyImpl(service_name, object_name,
1617 target_partition_id_(-1) {}
1619 const int32_t RingbufferImpl::MAX_BATCH_SIZE{ 1000 };
1623 const serialization::pimpl::data &data_entry_view::get_key()
const {
1627 const serialization::pimpl::data &data_entry_view::get_value()
const {
1631 int64_t data_entry_view::get_cost()
const {
1635 int64_t data_entry_view::get_creation_time()
const {
1636 return creation_time_;
1639 int64_t data_entry_view::get_expiration_time()
const {
1640 return expiration_time_;
1643 int64_t data_entry_view::get_hits()
const {
1647 int64_t data_entry_view::get_last_access_time()
const {
1648 return last_access_time_;
1651 int64_t data_entry_view::get_last_stored_time()
const {
1652 return last_stored_time_;
1655 int64_t data_entry_view::get_last_update_time()
const {
1656 return last_update_time_;
1659 int64_t data_entry_view::get_version()
const {
1663 int64_t data_entry_view::get_ttl()
const {
1667 int64_t data_entry_view::get_max_idle()
const {
1671 data_entry_view::data_entry_view(serialization::pimpl::data &&key, serialization::pimpl::data &&value,
1672 int64_t cost, int64_t creation_time,
1673 int64_t expiration_time, int64_t hits, int64_t last_access_time,
1674 int64_t last_stored_time, int64_t last_update_time, int64_t version,
1676 int64_t max_idle) : key_(std::move(key)), value_(std::move(value)),
1678 creation_time_(creation_time),
1679 expiration_time_(expiration_time),
1680 hits_(hits), last_access_time_(last_access_time),
1681 last_stored_time_(last_stored_time),
1682 last_update_time_(last_update_time), version_(version),
1684 max_idle_(max_idle) {}
1689 namespace reliable {
1690 ReliableTopicMessage::ReliableTopicMessage() : publish_time_(std::chrono::system_clock::now()) {}
1692 ReliableTopicMessage::ReliableTopicMessage(
1693 hazelcast::client::serialization::pimpl::data &&payload_data,
1694 std::unique_ptr<address> address)
1695 : publish_time_(std::chrono::system_clock::now())
1696 , payload_(std::move(payload_data)) {
1698 publisher_address_ = boost::make_optional(*address);
1702 std::chrono::system_clock::time_point ReliableTopicMessage::get_publish_time()
const {
1703 return publish_time_;
1706 const boost::optional<address> &ReliableTopicMessage::get_publisher_address()
const {
1707 return publisher_address_;
1710 serialization::pimpl::data &ReliableTopicMessage::get_payload() {
1717 namespace serialization {
1718 int32_t hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id() {
1722 int hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id() {
1723 return RELIABLE_TOPIC_MESSAGE;
1726 void hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
1727 const topic::impl::reliable::ReliableTopicMessage &
object, object_data_output &out) {
1728 out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
object.publish_time_.time_since_epoch()).count());
1729 out.write_object(
object.publisher_address_);
1730 out.write(
object.payload_.to_byte_array());
1733 topic::impl::reliable::ReliableTopicMessage
1734 hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(object_data_input &in) {
1735 topic::impl::reliable::ReliableTopicMessage message;
1736 auto now = std::chrono::system_clock::now();
1737 message.publish_time_ = now + std::chrono::milliseconds(in.read<int64_t>()) - now.time_since_epoch();
1738 message.publisher_address_ = in.read_object<address>();
1739 message.payload_ = serialization::pimpl::data(in.read<std::vector<byte>>().value());
1744 entry_event::entry_event(
const std::string &name, member &&member, type event_type,
1745 typed_data &&key, typed_data &&value, typed_data &&old_value, typed_data &&merging_value)
1746 : name_(name), member_(std::move(member)), event_type_(event_type), key_(std::move(key)),
1747 value_(std::move(value)), old_value_(std::move(old_value)), merging_value_(std::move(merging_value)) {}
1762 return merging_value_;
1777 std::ostream &operator<<(std::ostream &os,
const entry_event &event) {
1778 os <<
"name: " <<
event.name_ <<
" member: " <<
event.member_ <<
" eventType: " <<
1779 static_cast<int>(
event.event_type_) <<
" key: " << event.key_.
get_type() <<
" value: " <<
event.value_.get_type() <<
1780 " oldValue: " <<
event.old_value_.get_type() <<
" mergingValue: " <<
event.merging_value_.get_type();
1785 int number_of_entries_affected)
1786 : member_(
member), event_type_(event_type), name_(name), number_of_entries_affected_(number_of_entries_affected) {}
1801 return number_of_entries_affected_;
1804 std::ostream &operator<<(std::ostream &os,
const map_event &event) {
1805 os <<
"MapEvent{member: " <<
event.member_ <<
" eventType: " <<
static_cast<int>(
event.event_type_) <<
" name: " << event.name_
1806 <<
" numberOfEntriesAffected: " << event.number_of_entries_affected_;
1810 item_event_base::item_event_base(
const std::string &name,
const member &member,
const item_event_type &event_type)
1811 : name_(name), member_(member), event_type_(event_type) {}
1825 item_event_base::~item_event_base() =
default;
1827 flake_id_generator::flake_id_generator(
const std::string &object_name, spi::ClientContext *context)
1828 : flake_id_generator_impl(SERVICE_NAME, object_name, context) {}
const typed_data & get_key() const
Returns the key of the entry event.
const std::string & get_name() const
Returns the name of the map for this event.
const typed_data & get_old_value() const
Returns the old value of the entry event.
type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const typed_data & get_value() const
Returns the value of the entry event.
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
static const std::chrono::milliseconds UNSET
Default TTL value of a record.
item_event_type get_event_type() const
Return the event type.
const member & get_member() const
Returns the member fired this event.
const std::string & get_name() const
Returns the name of the collection for this event.
Map events common contract.
const std::string & get_name() const
Returns the name of the map for this event.
entry_event::type get_event_type() const
Return the event type.
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
const member & get_member() const
Returns the member fired this event.
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const