Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
proxy.cpp
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <unordered_set>
18#include <atomic>
19
20#include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
21#include "hazelcast/client/proxy/PNCounterImpl.h"
22#include "hazelcast/client/spi/ClientContext.h"
23#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
24#include "hazelcast/client/proxy/flake_id_generator_impl.h"
25#include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
26#include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
27#include "hazelcast/client/client_config.h"
28#include "hazelcast/client/map/data_entry_view.h"
29#include "hazelcast/client/proxy/RingbufferImpl.h"
30#include "hazelcast/client/impl/vector_clock.h"
31#include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
32#include "hazelcast/util/Util.h"
33#include "hazelcast/client/topic/reliable_listener.h"
34#include "hazelcast/client/proxy/ITopicImpl.h"
35#include "hazelcast/client/proxy/ReplicatedMapImpl.h"
36#include "hazelcast/client/flake_id_generator.h"
37#include "hazelcast/client/reliable_topic.h"
38
39namespace hazelcast {
40namespace client {
41const std::chrono::milliseconds imap::UNSET{ -1 };
42
43reliable_topic::reliable_topic(const std::string& instance_name,
44 spi::ClientContext* context)
45 : proxy::ProxyImpl(reliable_topic::SERVICE_NAME, instance_name, context)
46 , execution_service_(
47 context->get_client_execution_service().shared_from_this())
48 , executor_(execution_service_->get_user_executor())
49 , logger_(context->get_logger())
50{
51 auto reliable_config =
52 context->get_client_config().lookup_reliable_topic_config(instance_name);
53 if (reliable_config) {
54 batch_size_ = reliable_config->get_read_batch_size();
55 } else {
56 batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
57 }
58
59 ringbuffer_ =
60 context->get_hazelcast_client_implementation()
61 ->get_distributed_object<ringbuffer>(
62 std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
63}
64
65bool
66reliable_topic::remove_message_listener(const std::string& registration_id)
67{
68 int id = util::IOUtil::to_value<int>(registration_id);
69 auto runner = runners_map_.get(id);
70 if (!runner) {
71 return false;
72 }
73 runner->cancel();
74 return true;
75}
76
77void
78reliable_topic::on_shutdown()
79{
80 // cancel all runners
81 for (auto& entry : runners_map_.clear()) {
82 entry.second->cancel();
83 }
84}
85
86void
87reliable_topic::on_destroy()
88{
89 // cancel all runners
90 for (auto& entry : runners_map_.clear()) {
91 entry.second->cancel();
92 }
93}
94
95void
96reliable_topic::post_destroy()
97{
98 // destroy the underlying ringbuffer
99 ringbuffer_.get()->destroy().get();
100}
101
102namespace topic {
104 int64_t initial_sequence_id)
105 : loss_tolerant_(loss_tolerant)
106 , initial_sequence_id_(initial_sequence_id)
107{}
108} // namespace topic
109
110namespace impl {
111ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator()
112 : reference_id_counter_(0)
113{}
114
115int64_t
116ClientLockReferenceIdGenerator::get_next_reference_id()
117{
118 return ++reference_id_counter_;
119}
120} // namespace impl
121
122namespace proxy {
123MultiMapImpl::MultiMapImpl(const std::string& instance_name,
124 spi::ClientContext* context)
125 : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context)
126{
127 // TODO: remove this line once the client instance get_distributed_object
128 // works as expected in Java for this proxy type
129 lock_reference_id_generator_ =
130 get_context().get_lock_reference_id_generator();
131}
132
133boost::future<bool>
134MultiMapImpl::put(const serialization::pimpl::data& key,
135 const serialization::pimpl::data& value)
136{
137 auto request = protocol::codec::multimap_put_encode(
138 get_name(), key, value, util::get_current_thread_id());
139 return invoke_and_get_future<bool>(request, key);
140}
141
142boost::future<std::vector<serialization::pimpl::data>>
143MultiMapImpl::get_data(const serialization::pimpl::data& key)
144{
145 auto request = protocol::codec::multimap_get_encode(
146 get_name(), key, util::get_current_thread_id());
147 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
148 request, key);
149}
150
151boost::future<bool>
152MultiMapImpl::remove(const serialization::pimpl::data& key,
153 const serialization::pimpl::data& value)
154{
155 auto request = protocol::codec::multimap_removeentry_encode(
156 get_name(), key, value, util::get_current_thread_id());
157 return invoke_and_get_future<bool>(request, key);
158}
159
160boost::future<std::vector<serialization::pimpl::data>>
161MultiMapImpl::remove_data(const serialization::pimpl::data& key)
162{
163 auto request = protocol::codec::multimap_remove_encode(
164 get_name(), key, util::get_current_thread_id());
165 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
166 request, key);
167}
168
169boost::future<std::vector<serialization::pimpl::data>>
170MultiMapImpl::key_set_data()
171{
172 auto request = protocol::codec::multimap_keyset_encode(get_name());
173 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
174 request);
175}
176
177boost::future<std::vector<serialization::pimpl::data>>
178MultiMapImpl::values_data()
179{
180 auto request = protocol::codec::multimap_values_encode(get_name());
181 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
182 request);
183}
184
185boost::future<EntryVector>
186MultiMapImpl::entry_set_data()
187{
188 auto request = protocol::codec::multimap_entryset_encode(get_name());
189 return invoke_and_get_future<EntryVector>(request);
190}
191
192boost::future<bool>
193MultiMapImpl::contains_key(const serialization::pimpl::data& key)
194{
195 auto request = protocol::codec::multimap_containskey_encode(
196 get_name(), key, util::get_current_thread_id());
197 return invoke_and_get_future<bool>(request, key);
198}
199
200boost::future<bool>
201MultiMapImpl::contains_value(const serialization::pimpl::data& value)
202{
203 auto request =
204 protocol::codec::multimap_containsvalue_encode(get_name(), value);
205 return invoke_and_get_future<bool>(request);
206}
207
208boost::future<bool>
209MultiMapImpl::contains_entry(const serialization::pimpl::data& key,
210 const serialization::pimpl::data& value)
211{
212 auto request = protocol::codec::multimap_containsentry_encode(
213 get_name(), key, value, util::get_current_thread_id());
214 return invoke_and_get_future<bool>(request, key);
215}
216
217boost::future<int>
218MultiMapImpl::size()
219{
220 auto request = protocol::codec::multimap_size_encode(get_name());
221 return invoke_and_get_future<int>(request);
222}
223
224boost::future<void>
225MultiMapImpl::clear()
226{
227 auto request = protocol::codec::multimap_clear_encode(get_name());
228 return to_void_future(invoke(request));
229}
230
231boost::future<int>
232MultiMapImpl::value_count(const serialization::pimpl::data& key)
233{
234 auto request = protocol::codec::multimap_valuecount_encode(
235 get_name(), key, util::get_current_thread_id());
236 return invoke_and_get_future<int>(request, key);
237}
238
239boost::future<boost::uuids::uuid>
240MultiMapImpl::add_entry_listener(
241 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
242 bool include_value)
243{
244 return register_listener(
245 create_multi_map_entry_listener_codec(include_value),
246 std::move(entry_event_handler));
247}
248
249boost::future<boost::uuids::uuid>
250MultiMapImpl::add_entry_listener(
251 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
252 bool include_value,
253 serialization::pimpl::data&& key)
254{
255 return register_listener(
256 create_multi_map_entry_listener_codec(include_value, std::move(key)),
257 std::move(entry_event_handler));
258}
259
260boost::future<bool>
261MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
262{
263 return get_context().get_client_listener_service().deregister_listener(
264 registration_id);
265}
266
267boost::future<void>
268MultiMapImpl::lock(const serialization::pimpl::data& key)
269{
270 return lock(key, std::chrono::milliseconds(-1));
271}
272
273boost::future<void>
274MultiMapImpl::lock(const serialization::pimpl::data& key,
275 std::chrono::milliseconds lease_time)
276{
277 auto request = protocol::codec::multimap_lock_encode(
278 get_name(),
279 key,
280 util::get_current_thread_id(),
281 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
282 lock_reference_id_generator_->get_next_reference_id());
283 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
284}
285
286boost::future<bool>
287MultiMapImpl::is_locked(const serialization::pimpl::data& key)
288{
289 auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
290 return invoke_and_get_future<bool>(request, key);
291}
292
293boost::future<bool>
294MultiMapImpl::try_lock(const serialization::pimpl::data& key)
295{
296 auto request = protocol::codec::multimap_trylock_encode(
297 get_name(),
298 key,
299 util::get_current_thread_id(),
300 INT64_MAX,
301 0,
302 lock_reference_id_generator_->get_next_reference_id());
303 return invoke_and_get_future<bool>(request, key);
304}
305
306boost::future<bool>
307MultiMapImpl::try_lock(const serialization::pimpl::data& key,
308 std::chrono::milliseconds timeout)
309{
310 return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
311}
312
313boost::future<bool>
314MultiMapImpl::try_lock(const serialization::pimpl::data& key,
315 std::chrono::milliseconds timeout,
316 std::chrono::milliseconds lease_time)
317{
318 auto request = protocol::codec::multimap_trylock_encode(
319 get_name(),
320 key,
321 util::get_current_thread_id(),
322 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
323 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
324 lock_reference_id_generator_->get_next_reference_id());
325 return invoke_and_get_future<bool>(request, key);
326}
327
328boost::future<void>
329MultiMapImpl::unlock(const serialization::pimpl::data& key)
330{
331 auto request = protocol::codec::multimap_unlock_encode(
332 get_name(),
333 key,
334 util::get_current_thread_id(),
335 lock_reference_id_generator_->get_next_reference_id());
336 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
337}
338
339boost::future<void>
340MultiMapImpl::force_unlock(const serialization::pimpl::data& key)
341{
342 auto request = protocol::codec::multimap_forceunlock_encode(
343 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
344 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
345}
346
347std::shared_ptr<spi::impl::ListenerMessageCodec>
348MultiMapImpl::create_multi_map_entry_listener_codec(bool include_value)
349{
350 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
351 new MultiMapEntryListenerMessageCodec(get_name(), include_value));
352}
353
354std::shared_ptr<spi::impl::ListenerMessageCodec>
355MultiMapImpl::create_multi_map_entry_listener_codec(
356 bool include_value,
357 serialization::pimpl::data&& key)
358{
359 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
360 new MultiMapEntryListenerToKeyCodec(
361 get_name(), include_value, std::move(key)));
362}
363
364void
365MultiMapImpl::on_initialize()
366{
367 ProxyImpl::on_initialize();
368 lock_reference_id_generator_ =
369 get_context().get_lock_reference_id_generator();
370}
371
372MultiMapImpl::MultiMapEntryListenerMessageCodec::
373 MultiMapEntryListenerMessageCodec(std::string name, bool include_value)
374 : name_(std::move(name))
375 , include_value_(include_value)
376{}
377
378protocol::ClientMessage
379MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(
380 bool local_only) const
381{
382 return protocol::codec::multimap_addentrylistener_encode(
383 name_, include_value_, local_only);
384}
385
386protocol::ClientMessage
387MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
388 boost::uuids::uuid real_registration_id) const
389{
390 return protocol::codec::multimap_removeentrylistener_encode(
391 name_, real_registration_id);
392}
393
394protocol::ClientMessage
395MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(
396 bool local_only) const
397{
398 return protocol::codec::multimap_addentrylistenertokey_encode(
399 name_, key_, include_value_, local_only);
400}
401
402protocol::ClientMessage
403MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
404 boost::uuids::uuid real_registration_id) const
405{
406 return protocol::codec::multimap_removeentrylistener_encode(
407 name_, real_registration_id);
408}
409
410MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(
411 std::string name,
412 bool include_value,
413 serialization::pimpl::data&& key)
414 : name_(std::move(name))
415 , include_value_(include_value)
416 , key_(std::move(key))
417{}
418
419const std::shared_ptr<std::unordered_set<member>>
420 PNCounterImpl::EMPTY_ADDRESS_LIST(new std::unordered_set<member>());
421
422PNCounterImpl::PNCounterImpl(const std::string& service_name,
423 const std::string& object_name,
424 spi::ClientContext* context)
425 : ProxyImpl(service_name, object_name, context)
426 , max_configured_replica_count_(0)
427 , observed_clock_(
428 std::shared_ptr<impl::vector_clock>(new impl::vector_clock()))
429 , logger_(context->get_logger())
430{}
431
432std::ostream&
433operator<<(std::ostream& os, const PNCounterImpl& proxy)
434{
435 os << "PNCounter{name='" << proxy.get_name() << "\'}";
436 return os;
437}
438
439boost::future<int64_t>
440PNCounterImpl::get()
441{
442 boost::shared_ptr<member> target =
443 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
444 if (!target) {
445 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
446 "ClientPNCounterProxy::get",
447 "Cannot invoke operations on a CRDT because the cluster does not "
448 "contain any data members"));
449 }
450 return invoke_get_internal(EMPTY_ADDRESS_LIST, nullptr, target);
451}
452
453boost::future<int64_t>
454PNCounterImpl::get_and_add(int64_t delta)
455{
456 boost::shared_ptr<member> target =
457 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
458 if (!target) {
459 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
460 "ClientPNCounterProxy::getAndAdd",
461 "Cannot invoke operations on a CRDT because the cluster does not "
462 "contain any data members"));
463 }
464 return invoke_add_internal(
465 delta, true, EMPTY_ADDRESS_LIST, nullptr, target);
466}
467
468boost::future<int64_t>
469PNCounterImpl::add_and_get(int64_t delta)
470{
471 boost::shared_ptr<member> target =
472 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
473 if (!target) {
474 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
475 "ClientPNCounterProxy::addAndGet",
476 "Cannot invoke operations on a CRDT because the cluster does not "
477 "contain any data members"));
478 }
479 return invoke_add_internal(
480 delta, false, EMPTY_ADDRESS_LIST, nullptr, target);
481}
482
483boost::future<int64_t>
484PNCounterImpl::get_and_subtract(int64_t delta)
485{
486 boost::shared_ptr<member> target =
487 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
488 if (!target) {
489 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
490 "ClientPNCounterProxy::getAndSubtract",
491 "Cannot invoke operations on a CRDT because the cluster does not "
492 "contain any data members"));
493 }
494 return invoke_add_internal(
495 -delta, true, EMPTY_ADDRESS_LIST, nullptr, target);
496}
497
498boost::future<int64_t>
499PNCounterImpl::subtract_and_get(int64_t delta)
500{
501 boost::shared_ptr<member> target =
502 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
503 if (!target) {
504 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
505 "ClientPNCounterProxy::subtractAndGet",
506 "Cannot invoke operations on a CRDT because the cluster does not "
507 "contain any data members"));
508 }
509 return invoke_add_internal(
510 -delta, false, EMPTY_ADDRESS_LIST, nullptr, target);
511}
512
513boost::future<int64_t>
514PNCounterImpl::decrement_and_get()
515{
516 boost::shared_ptr<member> target =
517 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
518 if (!target) {
519 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
520 "ClientPNCounterProxy::decrementAndGet",
521 "Cannot invoke operations on a CRDT because the cluster does not "
522 "contain any data members"));
523 }
524 return invoke_add_internal(-1, false, EMPTY_ADDRESS_LIST, nullptr, target);
525}
526
527boost::future<int64_t>
528PNCounterImpl::increment_and_get()
529{
530 boost::shared_ptr<member> target =
531 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
532 if (!target) {
533 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
534 "ClientPNCounterProxy::incrementAndGet",
535 "Cannot invoke operations on a CRDT because the cluster does not "
536 "contain any data members"));
537 }
538 return invoke_add_internal(1, false, EMPTY_ADDRESS_LIST, nullptr, target);
539}
540
541boost::future<int64_t>
542PNCounterImpl::get_and_decrement()
543{
544 boost::shared_ptr<member> target =
545 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
546 if (!target) {
547 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
548 "ClientPNCounterProxy::getAndDecrement",
549 "Cannot invoke operations on a CRDT because the cluster does not "
550 "contain any data members"));
551 }
552 return invoke_add_internal(-1, true, EMPTY_ADDRESS_LIST, nullptr, target);
553}
554
555boost::future<int64_t>
556PNCounterImpl::get_and_increment()
557{
558 boost::shared_ptr<member> target =
559 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
560 if (!target) {
561 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
562 "ClientPNCounterProxy::getAndIncrement",
563 "Cannot invoke operations on a CRDT because the cluster does not "
564 "contain any data members"));
565 }
566 return invoke_add_internal(1, true, EMPTY_ADDRESS_LIST, nullptr, target);
567}
568
569boost::future<void>
570PNCounterImpl::reset()
571{
572 observed_clock_ =
573 std::shared_ptr<impl::vector_clock>(new impl::vector_clock());
574 return boost::make_ready_future();
575}
576
577boost::shared_ptr<member>
578PNCounterImpl::get_crdt_operation_target(
579 const std::unordered_set<member>& excluded_addresses)
580{
581 auto replicaAddress = current_target_replica_address_.load();
582 if (replicaAddress &&
583 excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
584 return replicaAddress;
585 }
586
587 {
588 std::lock_guard<std::mutex> guard(target_selection_mutex_);
589 replicaAddress = current_target_replica_address_.load();
590 if (!replicaAddress || excluded_addresses.find(*replicaAddress) !=
591 excluded_addresses.end()) {
592 current_target_replica_address_ =
593 choose_target_replica(excluded_addresses);
594 }
595 }
596 return current_target_replica_address_;
597}
598
599boost::shared_ptr<member>
600PNCounterImpl::choose_target_replica(
601 const std::unordered_set<member>& excluded_addresses)
602{
603 std::vector<member> replicaAddresses =
604 get_replica_addresses(excluded_addresses);
605 if (replicaAddresses.empty()) {
606 return nullptr;
607 }
608 // TODO: Use a random generator as used in Java (ThreadLocalRandomProvider)
609 // which is per thread
610 int randomReplicaIndex = std::abs(rand()) % (int)replicaAddresses.size();
611 return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
612}
613
614std::vector<member>
615PNCounterImpl::get_replica_addresses(
616 const std::unordered_set<member>& excluded_members)
617{
618 std::vector<member> dataMembers =
619 get_context().get_client_cluster_service().get_members(
620 *member_selectors::DATA_MEMBER_SELECTOR);
621 int32_t replicaCount = get_max_configured_replica_count();
622 int currentReplicaCount =
623 util::min<int>(replicaCount, (int)dataMembers.size());
624
625 std::vector<member> replicaMembers;
626 for (int i = 0; i < currentReplicaCount; i++) {
627 if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
628 replicaMembers.push_back(dataMembers[i]);
629 }
630 }
631 return replicaMembers;
632}
633
634int32_t
635PNCounterImpl::get_max_configured_replica_count()
636{
637 if (max_configured_replica_count_ > 0) {
638 return max_configured_replica_count_;
639 } else {
640 auto request =
641 protocol::codec::pncounter_getconfiguredreplicacount_encode(
642 get_name());
643 max_configured_replica_count_ =
644 invoke_and_get_future<int32_t>(request).get();
645 }
646 return max_configured_replica_count_;
647}
648
649boost::shared_ptr<member>
650PNCounterImpl::try_choose_a_new_target(
651 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
652 boost::shared_ptr<member> last_target,
653 const exception::hazelcast_& last_exception)
654{
655 HZ_LOG(
656 logger_,
657 finest,
658 boost::str(boost::format(
659 "Exception occurred while invoking operation on target %1%, "
660 "choosing different target. Cause: %2%") %
661 last_target % last_exception));
662 if (excluded_addresses == EMPTY_ADDRESS_LIST) {
663 // TODO: Make sure that this only affects the local variable of the
664 // method
665 excluded_addresses = std::make_shared<std::unordered_set<member>>();
666 }
667 excluded_addresses->insert(*last_target);
668 return get_crdt_operation_target(*excluded_addresses);
669}
670
671boost::future<int64_t>
672PNCounterImpl::invoke_get_internal(
673 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
674 std::exception_ptr last_exception,
675 const boost::shared_ptr<member>& target)
676{
677 if (!target) {
678 if (last_exception) {
679 std::rethrow_exception(last_exception);
680 } else {
681 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
682 "ClientPNCounterProxy::invokeGetInternal",
683 "Cannot invoke operations on a CRDT because the cluster does not "
684 "contain any data members"));
685 }
686 }
687 try {
688 auto timestamps = observed_clock_.get()->entry_set();
689 auto request = protocol::codec::pncounter_get_encode(
690 get_name(), timestamps, target->get_uuid());
691 return invoke_on_member(request, target->get_uuid())
692 .then(
693 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
694 try {
695 return get_and_update_timestamps(std::move(f));
696 } catch (exception::hazelcast_& e) {
697 return invoke_get_internal(excluded_addresses,
698 std::current_exception(),
699 try_choose_a_new_target(
700 excluded_addresses, target, e))
701 .get();
702 }
703 });
704 } catch (exception::hazelcast_& e) {
705 return invoke_get_internal(
706 excluded_addresses,
707 std::current_exception(),
708 try_choose_a_new_target(excluded_addresses, target, e));
709 }
710}
711
712boost::future<int64_t>
713PNCounterImpl::invoke_add_internal(
714 int64_t delta,
715 bool getBeforeUpdate,
716 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
717 std::exception_ptr last_exception,
718 const boost::shared_ptr<member>& target)
719{
720 if (!target) {
721 if (last_exception) {
722 std::rethrow_exception(last_exception);
723 } else {
724 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
725 "ClientPNCounterProxy::invokeGetInternal",
726 "Cannot invoke operations on a CRDT because the cluster does not "
727 "contain any data members"));
728 }
729 }
730
731 try {
732 auto request = protocol::codec::pncounter_add_encode(
733 get_name(),
734 delta,
735 getBeforeUpdate,
736 observed_clock_.get()->entry_set(),
737 target->get_uuid());
738 return invoke_on_member(request, target->get_uuid())
739 .then(
740 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
741 try {
742 return get_and_update_timestamps(std::move(f));
743 } catch (exception::hazelcast_& e) {
744 return invoke_add_internal(delta,
745 getBeforeUpdate,
746 excluded_addresses,
747 std::current_exception(),
748 try_choose_a_new_target(
749 excluded_addresses, target, e))
750 .get();
751 }
752 });
753 } catch (exception::hazelcast_& e) {
754 return invoke_add_internal(
755 delta,
756 getBeforeUpdate,
757 excluded_addresses,
758 std::current_exception(),
759 try_choose_a_new_target(excluded_addresses, target, e));
760 }
761}
762
763int64_t
764PNCounterImpl::get_and_update_timestamps(
765 boost::future<protocol::ClientMessage> f)
766{
767 auto msg = f.get();
768 auto value = msg.get_first_fixed_sized_field<int64_t>();
769 // skip replica count
770 msg.get<int32_t>();
771 update_observed_replica_timestamps(
772 msg.get<impl::vector_clock::timestamp_vector>());
773 return value;
774}
775
776void
777PNCounterImpl::update_observed_replica_timestamps(
778 const impl::vector_clock::timestamp_vector& received_logical_timestamps)
779{
780 std::shared_ptr<impl::vector_clock> received =
781 to_vector_clock(received_logical_timestamps);
782 for (;;) {
783 std::shared_ptr<impl::vector_clock> currentClock =
784 this->observed_clock_;
785 if (currentClock->is_after(*received)) {
786 break;
787 }
788 if (observed_clock_.compare_and_set(currentClock, received)) {
789 break;
790 }
791 }
792}
793
794std::shared_ptr<impl::vector_clock>
795PNCounterImpl::to_vector_clock(
796 const impl::vector_clock::timestamp_vector& replica_logical_timestamps)
797{
798 return std::shared_ptr<impl::vector_clock>(
799 new impl::vector_clock(replica_logical_timestamps));
800}
801
802boost::shared_ptr<member>
803PNCounterImpl::get_current_target_replica_address()
804{
805 return current_target_replica_address_.load();
806}
807
808IListImpl::IListImpl(const std::string& instance_name,
809 spi::ClientContext* context)
810 : ProxyImpl("hz:impl:listService", instance_name, context)
811{
812 serialization::pimpl::data key_data =
813 get_context().get_serialization_service().to_data<std::string>(
814 &instance_name);
815 partition_id_ = get_partition_id(key_data);
816}
817
818boost::future<bool>
819IListImpl::remove_item_listener(boost::uuids::uuid registration_id)
820{
821 return get_context().get_client_listener_service().deregister_listener(
822 registration_id);
823}
824
825boost::future<int>
826IListImpl::size()
827{
828 auto request = protocol::codec::list_size_encode(get_name());
829 return invoke_and_get_future<int>(request, partition_id_);
830}
831
832boost::future<bool>
833IListImpl::is_empty()
834{
835 auto request = protocol::codec::list_isempty_encode(get_name());
836 return invoke_and_get_future<bool>(request, partition_id_);
837}
838
839boost::future<bool>
840IListImpl::contains(const serialization::pimpl::data& element)
841{
842 auto request = protocol::codec::list_contains_encode(get_name(), element);
843 return invoke_and_get_future<bool>(request, partition_id_);
844}
845
846boost::future<std::vector<serialization::pimpl::data>>
847IListImpl::to_array_data()
848{
849 auto request = protocol::codec::list_getall_encode(get_name());
850 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
851 request, partition_id_);
852}
853
854boost::future<bool>
855IListImpl::add(const serialization::pimpl::data& element)
856{
857 auto request = protocol::codec::list_add_encode(get_name(), element);
858 return invoke_and_get_future<bool>(request, partition_id_);
859}
860
861boost::future<bool>
862IListImpl::remove(const serialization::pimpl::data& element)
863{
864 auto request = protocol::codec::list_remove_encode(get_name(), element);
865 return invoke_and_get_future<bool>(request, partition_id_);
866}
867
868boost::future<bool>
869IListImpl::contains_all_data(
870 const std::vector<serialization::pimpl::data>& elements)
871{
872 auto request =
873 protocol::codec::list_containsall_encode(get_name(), elements);
874 return invoke_and_get_future<bool>(request, partition_id_);
875}
876
877boost::future<bool>
878IListImpl::add_all_data(const std::vector<serialization::pimpl::data>& elements)
879{
880 auto request = protocol::codec::list_addall_encode(get_name(), elements);
881 return invoke_and_get_future<bool>(request, partition_id_);
882}
883
884boost::future<bool>
885IListImpl::add_all_data(int index,
886 const std::vector<serialization::pimpl::data>& elements)
887{
888 auto request =
889 protocol::codec::list_addallwithindex_encode(get_name(), index, elements);
890 return invoke_and_get_future<bool>(request, partition_id_);
891}
892
893boost::future<bool>
894IListImpl::remove_all_data(
895 const std::vector<serialization::pimpl::data>& elements)
896{
897 auto request =
898 protocol::codec::list_compareandremoveall_encode(get_name(), elements);
899 return invoke_and_get_future<bool>(request, partition_id_);
900}
901
902boost::future<bool>
903IListImpl::retain_all_data(
904 const std::vector<serialization::pimpl::data>& elements)
905{
906 auto request =
907 protocol::codec::list_compareandretainall_encode(get_name(), elements);
908 return invoke_and_get_future<bool>(request, partition_id_);
909}
910
911boost::future<void>
912IListImpl::clear()
913{
914 auto request = protocol::codec::list_clear_encode(get_name());
915 return to_void_future(invoke_on_partition(request, partition_id_));
916}
917
918boost::future<boost::optional<serialization::pimpl::data>>
919IListImpl::get_data(int index)
920{
921 auto request = protocol::codec::list_get_encode(get_name(), index);
922 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
923 request, partition_id_);
924}
925
926boost::future<boost::optional<serialization::pimpl::data>>
927IListImpl::set_data(int index, const serialization::pimpl::data& element)
928{
929 auto request = protocol::codec::list_set_encode(get_name(), index, element);
930 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
931 request, partition_id_);
932}
933
934boost::future<void>
935IListImpl::add(int index, const serialization::pimpl::data& element)
936{
937 auto request =
938 protocol::codec::list_addwithindex_encode(get_name(), index, element);
939 return to_void_future(invoke_on_partition(request, partition_id_));
940}
941
942boost::future<boost::optional<serialization::pimpl::data>>
943IListImpl::remove_data(int index)
944{
945 auto request =
946 protocol::codec::list_removewithindex_encode(get_name(), index);
947 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
948 request, partition_id_);
949}
950
951boost::future<int>
952IListImpl::index_of(const serialization::pimpl::data& element)
953{
954 auto request = protocol::codec::list_indexof_encode(get_name(), element);
955 return invoke_and_get_future<int>(request, partition_id_);
956}
957
958boost::future<int>
959IListImpl::last_index_of(const serialization::pimpl::data& element)
960{
961 auto request =
962 protocol::codec::list_lastindexof_encode(get_name(), element);
963 return invoke_and_get_future<int>(request, partition_id_);
964}
965
966boost::future<std::vector<serialization::pimpl::data>>
967IListImpl::sub_list_data(int from_index, int to_index)
968{
969 auto request =
970 protocol::codec::list_sub_encode(get_name(), from_index, to_index);
971 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
972 request, partition_id_);
973}
974
975std::shared_ptr<spi::impl::ListenerMessageCodec>
976IListImpl::create_item_listener_codec(bool include_value)
977{
978 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
979 new ListListenerMessageCodec(get_name(), include_value));
980}
981
982IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(
983 std::string name,
984 bool include_value)
985 : name_(std::move(name))
986 , include_value_(include_value)
987{}
988
989protocol::ClientMessage
990IListImpl::ListListenerMessageCodec::encode_add_request(bool local_only) const
991{
992 return protocol::codec::list_addlistener_encode(
993 name_, include_value_, local_only);
994}
995
996protocol::ClientMessage
997IListImpl::ListListenerMessageCodec::encode_remove_request(
998 boost::uuids::uuid real_registration_id) const
999{
1000 return protocol::codec::list_removelistener_encode(name_,
1001 real_registration_id);
1002}
1003
1004flake_id_generator_impl::Block::Block(IdBatch&& id_batch,
1005 std::chrono::milliseconds validity)
1006 : id_batch_(id_batch)
1007 , invalid_since_(std::chrono::steady_clock::now() + validity)
1008 , num_returned_(0)
1009{}
1010
1011int64_t
1012flake_id_generator_impl::Block::next()
1013{
1014 if (invalid_since_ <= std::chrono::steady_clock::now()) {
1015 return INT64_MIN;
1016 }
1017 int32_t index;
1018 do {
1019 index = num_returned_;
1020 if (index == id_batch_.get_batch_size()) {
1021 return INT64_MIN;
1022 }
1023 } while (!num_returned_.compare_exchange_strong(index, index + 1));
1024
1025 return id_batch_.get_base() + index * id_batch_.get_increment();
1026}
1027
1028flake_id_generator_impl::IdBatch::IdIterator
1029 flake_id_generator_impl::IdBatch::endOfBatch;
1030
1031int64_t
1032flake_id_generator_impl::IdBatch::get_base() const
1033{
1034 return base_;
1035}
1036
1037int64_t
1038flake_id_generator_impl::IdBatch::get_increment() const
1039{
1040 return increment_;
1041}
1042
1043int32_t
1044flake_id_generator_impl::IdBatch::get_batch_size() const
1045{
1046 return batch_size_;
1047}
1048
1049flake_id_generator_impl::IdBatch::IdBatch(int64_t base,
1050 int64_t increment,
1051 int32_t batch_size)
1052 : base_(base)
1053 , increment_(increment)
1054 , batch_size_(batch_size)
1055{}
1056
1057flake_id_generator_impl::IdBatch::IdIterator&
1058flake_id_generator_impl::IdBatch::end()
1059{
1060 return endOfBatch;
1061}
1062
1063flake_id_generator_impl::IdBatch::IdIterator
1064flake_id_generator_impl::IdBatch::iterator()
1065{
1066 return flake_id_generator_impl::IdBatch::IdIterator(
1067 base_, increment_, batch_size_);
1068}
1069
1070flake_id_generator_impl::IdBatch::IdIterator::IdIterator(
1071 int64_t base2,
1072 const int64_t increment,
1073 int32_t remaining)
1074 : base2_(base2)
1075 , increment_(increment)
1076 , remaining_(remaining)
1077{}
1078
1079bool
1080flake_id_generator_impl::IdBatch::IdIterator::operator==(
1081 const flake_id_generator_impl::IdBatch::IdIterator& rhs) const
1082{
1083 return base2_ == rhs.base2_ && increment_ == rhs.increment_ &&
1084 remaining_ == rhs.remaining_;
1085}
1086
1087bool
1088flake_id_generator_impl::IdBatch::IdIterator::operator!=(
1089 const flake_id_generator_impl::IdBatch::IdIterator& rhs) const
1090{
1091 return !(rhs == *this);
1092}
1093const int64_t&
1094flake_id_generator_impl::IdBatch::IdIterator::operator*() const
1095{
1096 return base2_;
1097}
1098
1099flake_id_generator_impl::IdBatch::IdIterator::IdIterator()
1100 : base2_(-1)
1101 , increment_(-1)
1102 , remaining_(-1)
1103{}
1104
1105flake_id_generator_impl::IdBatch::IdIterator&
1106flake_id_generator_impl::IdBatch::IdIterator::operator++()
1107{
1108 if (remaining_ == 0) {
1109 return flake_id_generator_impl::IdBatch::end();
1110 }
1111
1112 --remaining_;
1113
1114 base2_ += increment_;
1115
1116 return *this;
1117}
1118
1119flake_id_generator_impl::flake_id_generator_impl(
1120 const std::string& service_name,
1121 const std::string& object_name,
1122 spi::ClientContext* context)
1123 : ProxyImpl(service_name, object_name, context)
1124 , block_(nullptr)
1125{
1126 auto config =
1127 context->get_client_config().find_flake_id_generator_config(object_name);
1128 batch_size_ = config->get_prefetch_count();
1129 validity_ = config->get_prefetch_validity_duration();
1130}
1131
1132int64_t
1133flake_id_generator_impl::new_id_internal()
1134{
1135 auto b = block_.load();
1136 if (b) {
1137 int64_t res = b->next();
1138 if (res != INT64_MIN) {
1139 return res;
1140 }
1141 }
1142
1143 throw std::overflow_error("");
1144}
1145
1146boost::future<int64_t>
1147flake_id_generator_impl::new_id()
1148{
1149 try {
1150 return boost::make_ready_future(new_id_internal());
1151 } catch (std::overflow_error&) {
1152 return new_id_batch(batch_size_)
1153 .then(boost::launch::sync,
1154 [=](boost::future<flake_id_generator_impl::IdBatch> f) {
1155 auto newBlock =
1156 boost::make_shared<Block>(f.get(), validity_);
1157 auto value = newBlock->next();
1158 auto b = block_.load();
1159 block_.compare_exchange_strong(b, newBlock);
1160 return value;
1161 });
1162 }
1163}
1164
1165boost::future<flake_id_generator_impl::IdBatch>
1166flake_id_generator_impl::new_id_batch(int32_t size)
1167{
1168 auto request =
1169 protocol::codec::flakeidgenerator_newidbatch_encode(get_name(), size);
1170 return invoke(request).then(
1171 boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
1172 auto msg = f.get();
1173 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
1174
1175 auto base = msg.get<int64_t>();
1176 auto increment = msg.get<int64_t>();
1177 auto batch_size = msg.get<int32_t>();
1178 return flake_id_generator_impl::IdBatch(base, increment, batch_size);
1179 });
1180}
1181
1182IQueueImpl::IQueueImpl(const std::string& instance_name,
1183 spi::ClientContext* context)
1184 : ProxyImpl("hz:impl:queueService", instance_name, context)
1185{
1186 serialization::pimpl::data data =
1187 get_context().get_serialization_service().to_data<std::string>(
1188 &instance_name);
1189 partition_id_ = get_partition_id(data);
1190}
1191
1192boost::future<bool>
1193IQueueImpl::remove_item_listener(boost::uuids::uuid registration_id)
1194{
1195 return get_context().get_client_listener_service().deregister_listener(
1196 registration_id);
1197}
1198
1199boost::future<bool>
1200IQueueImpl::offer(const serialization::pimpl::data& element,
1201 std::chrono::milliseconds timeout)
1202{
1203 auto request = protocol::codec::queue_offer_encode(
1204 get_name(),
1205 element,
1206 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1207 return invoke_and_get_future<bool>(request, partition_id_);
1208}
1209
1210boost::future<void>
1211IQueueImpl::put(const serialization::pimpl::data& element)
1212{
1213 auto request = protocol::codec::queue_put_encode(get_name(), element);
1214 return to_void_future(invoke_on_partition(request, partition_id_));
1215}
1216
1217boost::future<boost::optional<serialization::pimpl::data>>
1218IQueueImpl::poll_data(std::chrono::milliseconds timeout)
1219{
1220 auto request = protocol::codec::queue_poll_encode(
1221 get_name(),
1222 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1223 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1224 request, partition_id_);
1225}
1226
1227boost::future<int>
1228IQueueImpl::remaining_capacity()
1229{
1230 auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
1231 return invoke_and_get_future<int>(request, partition_id_);
1232}
1233
1234boost::future<bool>
1235IQueueImpl::remove(const serialization::pimpl::data& element)
1236{
1237 auto request = protocol::codec::queue_remove_encode(get_name(), element);
1238 return invoke_and_get_future<bool>(request, partition_id_);
1239}
1240
1241boost::future<bool>
1242IQueueImpl::contains(const serialization::pimpl::data& element)
1243{
1244 auto request = protocol::codec::queue_contains_encode(get_name(), element);
1245 return invoke_and_get_future<bool>(request, partition_id_);
1246}
1247
1248boost::future<std::vector<serialization::pimpl::data>>
1249IQueueImpl::drain_to_data(size_t max_elements)
1250{
1251 auto request = protocol::codec::queue_draintomaxsize_encode(
1252 get_name(), (int32_t)max_elements);
1253
1254 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1255 request, partition_id_);
1256}
1257
1258boost::future<std::vector<serialization::pimpl::data>>
1259IQueueImpl::drain_to_data()
1260{
1261 auto request = protocol::codec::queue_drainto_encode(get_name());
1262 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1263 request, partition_id_);
1264}
1265
1266boost::future<boost::optional<serialization::pimpl::data>>
1267IQueueImpl::take_data()
1268{
1269 auto request = protocol::codec::queue_take_encode(get_name());
1270 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1271 request, partition_id_);
1272}
1273
1274boost::future<boost::optional<serialization::pimpl::data>>
1275IQueueImpl::peek_data()
1276{
1277 auto request = protocol::codec::queue_peek_encode(get_name());
1278 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1279 request, partition_id_);
1280}
1281
1282boost::future<int>
1283IQueueImpl::size()
1284{
1285 auto request = protocol::codec::queue_size_encode(get_name());
1286 return invoke_and_get_future<int>(request, partition_id_);
1287}
1288
1289boost::future<bool>
1290IQueueImpl::is_empty()
1291{
1292 auto request = protocol::codec::queue_isempty_encode(get_name());
1293 return invoke_and_get_future<bool>(request, partition_id_);
1294}
1295
1296boost::future<std::vector<serialization::pimpl::data>>
1297IQueueImpl::to_array_data()
1298{
1299 auto request = protocol::codec::queue_iterator_encode(get_name());
1300 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1301 request, partition_id_);
1302}
1303
1304boost::future<bool>
1305IQueueImpl::contains_all_data(
1306 const std::vector<serialization::pimpl::data>& elements)
1307{
1308 auto request =
1309 protocol::codec::queue_containsall_encode(get_name(), elements);
1310 return invoke_and_get_future<bool>(request, partition_id_);
1311}
1312
1313boost::future<bool>
1314IQueueImpl::add_all_data(
1315 const std::vector<serialization::pimpl::data>& elements)
1316{
1317 auto request = protocol::codec::queue_addall_encode(get_name(), elements);
1318 return invoke_and_get_future<bool>(request, partition_id_);
1319}
1320
1321boost::future<bool>
1322IQueueImpl::remove_all_data(
1323 const std::vector<serialization::pimpl::data>& elements)
1324{
1325 auto request =
1326 protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
1327 return invoke_and_get_future<bool>(request, partition_id_);
1328}
1329
1330boost::future<bool>
1331IQueueImpl::retain_all_data(
1332 const std::vector<serialization::pimpl::data>& elements)
1333{
1334 auto request =
1335 protocol::codec::queue_compareandretainall_encode(get_name(), elements);
1336 return invoke_and_get_future<bool>(request, partition_id_);
1337}
1338
1339boost::future<void>
1340IQueueImpl::clear()
1341{
1342 auto request = protocol::codec::queue_clear_encode(get_name());
1343 return to_void_future(invoke_on_partition(request, partition_id_));
1344}
1345
1346std::shared_ptr<spi::impl::ListenerMessageCodec>
1347IQueueImpl::create_item_listener_codec(bool include_value)
1348{
1349 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1350 new QueueListenerMessageCodec(get_name(), include_value));
1351}
1352
1353IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(
1354 std::string name,
1355 bool include_value)
1356 : name_(std::move(name))
1357 , include_value_(include_value)
1358{}
1359
1360protocol::ClientMessage
1361IQueueImpl::QueueListenerMessageCodec::encode_add_request(bool local_only) const
1362{
1363 return protocol::codec::queue_addlistener_encode(
1364 name_, include_value_, local_only);
1365}
1366
1367protocol::ClientMessage
1368IQueueImpl::QueueListenerMessageCodec::encode_remove_request(
1369 boost::uuids::uuid real_registration_id) const
1370{
1371 return protocol::codec::queue_removelistener_encode(name_,
1372 real_registration_id);
1373}
1374
1375ProxyImpl::ProxyImpl(const std::string& service_name,
1376 const std::string& object_name,
1377 spi::ClientContext* context)
1378 : ClientProxy(object_name, service_name, *context)
1379 , SerializingProxy(*context, object_name)
1380{}
1381
1382ProxyImpl::~ProxyImpl() = default;
1383
1384SerializingProxy::SerializingProxy(spi::ClientContext& context,
1385 const std::string& object_name)
1386 : serialization_service_(context.get_serialization_service())
1387 , partition_service_(context.get_partition_service())
1388 , object_name_(object_name)
1389 , client_context_(context)
1390{}
1391
1392int
1393SerializingProxy::get_partition_id(const serialization::pimpl::data& key)
1394{
1395 return partition_service_.get_partition_id(key);
1396}
1397
1398boost::future<protocol::ClientMessage>
1399SerializingProxy::invoke_on_partition(protocol::ClientMessage& request,
1400 int partition_id)
1401{
1402 try {
1403 return spi::impl::ClientInvocation::create(
1404 client_context_,
1405 std::make_shared<protocol::ClientMessage>(std::move(request)),
1406 object_name_,
1407 partition_id)
1408 ->invoke();
1409 } catch (exception::iexception&) {
1410 util::exception_util::rethrow(std::current_exception());
1411 return boost::make_ready_future(protocol::ClientMessage(0));
1412 }
1413}
1414
1415boost::future<protocol::ClientMessage>
1416SerializingProxy::invoke(protocol::ClientMessage& request)
1417{
1418 try {
1419 return spi::impl::ClientInvocation::create(
1420 client_context_,
1421 std::make_shared<protocol::ClientMessage>(std::move(request)),
1422 object_name_)
1423 ->invoke();
1424 } catch (exception::iexception&) {
1425 util::exception_util::rethrow(std::current_exception());
1426 return boost::make_ready_future(protocol::ClientMessage(0));
1427 }
1428}
1429
1430boost::future<protocol::ClientMessage>
1431SerializingProxy::invoke_on_connection(
1432 protocol::ClientMessage& request,
1433 std::shared_ptr<connection::Connection> connection)
1434{
1435 try {
1436 return spi::impl::ClientInvocation::create(
1437 client_context_,
1438 std::make_shared<protocol::ClientMessage>(std::move(request)),
1439 object_name_,
1440 connection)
1441 ->invoke();
1442 } catch (exception::iexception&) {
1443 util::exception_util::rethrow(std::current_exception());
1444 return boost::make_ready_future(protocol::ClientMessage(0));
1445 }
1446}
1447
1448boost::future<protocol::ClientMessage>
1449SerializingProxy::invoke_on_key_owner(
1450 protocol::ClientMessage& request,
1451 const serialization::pimpl::data& key_data)
1452{
1453 try {
1454 return invoke_on_partition(request, get_partition_id(key_data));
1455 } catch (exception::iexception&) {
1456 util::exception_util::rethrow(std::current_exception());
1457 return boost::make_ready_future(protocol::ClientMessage(0));
1458 }
1459}
1460
1461boost::future<protocol::ClientMessage>
1462SerializingProxy::invoke_on_member(protocol::ClientMessage& request,
1463 boost::uuids::uuid uuid)
1464{
1465 try {
1466 auto invocation = spi::impl::ClientInvocation::create(
1467 client_context_,
1468 std::make_shared<protocol::ClientMessage>(std::move(request)),
1469 object_name_,
1470 uuid);
1471 return invocation->invoke();
1472 } catch (exception::iexception&) {
1473 util::exception_util::rethrow(std::current_exception());
1474 return boost::make_ready_future(protocol::ClientMessage(0));
1475 }
1476}
1477
1478template<>
1479boost::future<boost::optional<serialization::pimpl::data>>
1480SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request)
1481{
1482 return decode_optional_var_sized<serialization::pimpl::data>(
1483 invoke(request));
1484}
1485
1486template<>
1487boost::future<boost::optional<map::data_entry_view>>
1488SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1489 const serialization::pimpl::data& key)
1490{
1491 return decode_optional_var_sized<map::data_entry_view>(
1492 invoke_on_key_owner(request, key));
1493}
1494
1495template<>
1496boost::future<boost::optional<serialization::pimpl::data>>
1497SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1498 int partition_id)
1499{
1500 return decode_optional_var_sized<serialization::pimpl::data>(
1501 invoke_on_partition(request, partition_id));
1502}
1503
1504template<>
1505boost::future<boost::optional<serialization::pimpl::data>>
1506SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1507 const serialization::pimpl::data& key)
1508{
1509 return decode_optional_var_sized<serialization::pimpl::data>(
1510 invoke_on_key_owner(request, key));
1511}
1512
1513PartitionSpecificClientProxy::PartitionSpecificClientProxy(
1514 const std::string& service_name,
1515 const std::string& object_name,
1516 spi::ClientContext* context)
1517 : ProxyImpl(service_name, object_name, context)
1518 , partition_id_(-1)
1519{}
1520
1521void
1522PartitionSpecificClientProxy::on_initialize()
1523{
1524 std::string partitionKey = internal::partition::strategy::
1525 StringPartitioningStrategy::get_partition_key(name_);
1526 partition_id_ = get_context().get_partition_service().get_partition_id(
1527 to_data<std::string>(partitionKey));
1528}
1529
1530IMapImpl::IMapImpl(const std::string& instance_name,
1531 spi::ClientContext* context)
1532 : ProxyImpl("hz:impl:mapService", instance_name, context)
1533{}
1534
1535boost::future<bool>
1536IMapImpl::contains_key(const serialization::pimpl::data& key)
1537{
1538 auto request = protocol::codec::map_containskey_encode(
1539 get_name(), key, util::get_current_thread_id());
1540 return invoke_and_get_future<bool>(request, key);
1541}
1542
1543boost::future<bool>
1544IMapImpl::contains_value(const serialization::pimpl::data& value)
1545{
1546 auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1547 return invoke_and_get_future<bool>(request);
1548}
1549
1550boost::future<boost::optional<serialization::pimpl::data>>
1551IMapImpl::get_data(const serialization::pimpl::data& key)
1552{
1553 auto request = protocol::codec::map_get_encode(
1554 get_name(), key, util::get_current_thread_id());
1555 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1556 request, key);
1557}
1558
1559boost::future<boost::optional<serialization::pimpl::data>>
1560IMapImpl::remove_data(const serialization::pimpl::data& key)
1561{
1562 auto request = protocol::codec::map_remove_encode(
1563 get_name(), key, util::get_current_thread_id());
1564 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1565 request, key);
1566}
1567
1568boost::future<bool>
1569IMapImpl::remove(const serialization::pimpl::data& key,
1570 const serialization::pimpl::data& value)
1571{
1572 auto request = protocol::codec::map_removeifsame_encode(
1573 get_name(), key, value, util::get_current_thread_id());
1574 return invoke_and_get_future<bool>(request, key);
1575}
1576
1577boost::future<protocol::ClientMessage>
1578IMapImpl::remove_all(const serialization::pimpl::data& predicate_data)
1579{
1580 auto request =
1581 protocol::codec::map_removeall_encode(get_name(), predicate_data);
1582 return invoke(request);
1583}
1584
1585boost::future<protocol::ClientMessage>
1586IMapImpl::delete_entry(const serialization::pimpl::data& key)
1587{
1588 auto request = protocol::codec::map_delete_encode(
1589 get_name(), key, util::get_current_thread_id());
1590 return invoke_on_partition(request, get_partition_id(key));
1591}
1592
1593boost::future<protocol::ClientMessage>
1594IMapImpl::flush()
1595{
1596 auto request = protocol::codec::map_flush_encode(get_name());
1597 return invoke(request);
1598}
1599
1600boost::future<bool>
1601IMapImpl::try_remove(const serialization::pimpl::data& key,
1602 std::chrono::milliseconds timeout)
1603{
1604 auto request = protocol::codec::map_tryremove_encode(
1605 get_name(),
1606 key,
1607 util::get_current_thread_id(),
1608 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1609
1610 return invoke_and_get_future<bool>(request, key);
1611}
1612
1613boost::future<bool>
1614IMapImpl::try_put(const serialization::pimpl::data& key,
1615 const serialization::pimpl::data& value,
1616 std::chrono::milliseconds timeout)
1617{
1618 auto request = protocol::codec::map_tryput_encode(
1619 get_name(),
1620 key,
1621 value,
1622 util::get_current_thread_id(),
1623 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1624
1625 return invoke_and_get_future<bool>(request, key);
1626}
1627
1628boost::future<boost::optional<serialization::pimpl::data>>
1629IMapImpl::put_data(const serialization::pimpl::data& key,
1630 const serialization::pimpl::data& value,
1631 std::chrono::milliseconds ttl)
1632{
1633 auto request = protocol::codec::map_put_encode(
1634 get_name(),
1635 key,
1636 value,
1637 util::get_current_thread_id(),
1638 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1639 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1640 request, key);
1641}
1642
1643boost::future<protocol::ClientMessage>
1644IMapImpl::put_transient(const serialization::pimpl::data& key,
1645 const serialization::pimpl::data& value,
1646 std::chrono::milliseconds ttl)
1647{
1648 auto request = protocol::codec::map_puttransient_encode(
1649 get_name(),
1650 key,
1651 value,
1652 util::get_current_thread_id(),
1653 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1654 return invoke_on_partition(request, get_partition_id(key));
1655}
1656
1657boost::future<boost::optional<serialization::pimpl::data>>
1658IMapImpl::put_if_absent_data(const serialization::pimpl::data& key,
1659 const serialization::pimpl::data& value,
1660 std::chrono::milliseconds ttl)
1661{
1662 auto request = protocol::codec::map_putifabsent_encode(
1663 get_name(),
1664 key,
1665 value,
1666 util::get_current_thread_id(),
1667 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1668 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1669 request, key);
1670}
1671
1672boost::future<bool>
1673IMapImpl::replace(const serialization::pimpl::data& key,
1674 const serialization::pimpl::data& old_value,
1675 const serialization::pimpl::data& new_value)
1676{
1677 auto request = protocol::codec::map_replaceifsame_encode(
1678 get_name(), key, old_value, new_value, util::get_current_thread_id());
1679
1680 return invoke_and_get_future<bool>(request, key);
1681}
1682
1683boost::future<boost::optional<serialization::pimpl::data>>
1684IMapImpl::replace_data(const serialization::pimpl::data& key,
1685 const serialization::pimpl::data& value)
1686{
1687 auto request = protocol::codec::map_replace_encode(
1688 get_name(), key, value, util::get_current_thread_id());
1689
1690 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1691 request, key);
1692}
1693
1694boost::future<protocol::ClientMessage>
1695IMapImpl::set(const serialization::pimpl::data& key,
1696 const serialization::pimpl::data& value,
1697 std::chrono::milliseconds ttl)
1698{
1699 auto request = protocol::codec::map_set_encode(
1700 get_name(),
1701 key,
1702 value,
1703 util::get_current_thread_id(),
1704 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1705 return invoke_on_partition(request, get_partition_id(key));
1706}
1707
1708boost::future<protocol::ClientMessage>
1709IMapImpl::lock(const serialization::pimpl::data& key)
1710{
1711 return lock(key, std::chrono::milliseconds(-1));
1712}
1713
1714boost::future<protocol::ClientMessage>
1715IMapImpl::lock(const serialization::pimpl::data& key,
1716 std::chrono::milliseconds lease_time)
1717{
1718 auto request = protocol::codec::map_lock_encode(
1719 get_name(),
1720 key,
1721 util::get_current_thread_id(),
1722 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1723 lock_reference_id_generator_->get_next_reference_id());
1724 return invoke_on_partition(request, get_partition_id(key));
1725}
1726
1727boost::future<bool>
1728IMapImpl::is_locked(const serialization::pimpl::data& key)
1729{
1730 auto request = protocol::codec::map_islocked_encode(get_name(), key);
1731
1732 return invoke_and_get_future<bool>(request, key);
1733}
1734
1735boost::future<bool>
1736IMapImpl::try_lock(const serialization::pimpl::data& key,
1737 std::chrono::milliseconds timeout)
1738{
1739 return try_lock(key, timeout, std::chrono::milliseconds(-1));
1740}
1741
1742boost::future<bool>
1743IMapImpl::try_lock(const serialization::pimpl::data& key,
1744 std::chrono::milliseconds timeout,
1745 std::chrono::milliseconds lease_time)
1746{
1747 auto request = protocol::codec::map_trylock_encode(
1748 get_name(),
1749 key,
1750 util::get_current_thread_id(),
1751 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1752 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1753 lock_reference_id_generator_->get_next_reference_id());
1754 return invoke_and_get_future<bool>(request, key);
1755}
1756
1757boost::future<protocol::ClientMessage>
1758IMapImpl::unlock(const serialization::pimpl::data& key)
1759{
1760 auto request = protocol::codec::map_unlock_encode(
1761 get_name(),
1762 key,
1763 util::get_current_thread_id(),
1764 lock_reference_id_generator_->get_next_reference_id());
1765 return invoke_on_partition(request, get_partition_id(key));
1766}
1767
1768boost::future<protocol::ClientMessage>
1769IMapImpl::force_unlock(const serialization::pimpl::data& key)
1770{
1771 auto request = protocol::codec::map_forceunlock_encode(
1772 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
1773 return invoke_on_partition(request, get_partition_id(key));
1774}
1775
1776boost::future<std::string>
1777IMapImpl::add_interceptor(const serialization::pimpl::data& interceptor)
1778{
1779 auto request =
1780 protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1781 return invoke_and_get_future<std::string>(request);
1782}
1783
1784// TODO: We can use generic template Listener instead of impl::BaseEventHandler
1785// to prevent the virtual function calls
1786boost::future<boost::uuids::uuid>
1787IMapImpl::add_entry_listener(
1788 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1789 bool include_value,
1790 int32_t listener_flags)
1791{
1792 return register_listener(
1793 create_map_entry_listener_codec(include_value, listener_flags),
1794 std::move(entry_event_handler));
1795}
1796
1797boost::future<boost::uuids::uuid>
1798IMapImpl::add_entry_listener(
1799 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1800 serialization::pimpl::data&& predicate,
1801 bool include_value,
1802 int32_t listener_flags)
1803{
1804 return register_listener(
1805 create_map_entry_listener_codec(
1806 include_value, std::move(predicate), listener_flags),
1807 std::move(entry_event_handler));
1808}
1809
1810boost::future<bool>
1811IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
1812{
1813 return get_context().get_client_listener_service().deregister_listener(
1814 registration_id);
1815}
1816
1817boost::future<boost::uuids::uuid>
1818IMapImpl::add_entry_listener(
1819 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1820 bool include_value,
1821 serialization::pimpl::data&& key,
1822 int32_t listener_flags)
1823{
1824 return register_listener(create_map_entry_listener_codec(
1825 include_value, listener_flags, std::move(key)),
1826 std::move(entry_event_handler));
1827}
1828
1829boost::future<boost::optional<map::data_entry_view>>
1830IMapImpl::get_entry_view_data(const serialization::pimpl::data& key)
1831{
1832 auto request = protocol::codec::map_getentryview_encode(
1833 get_name(), key, util::get_current_thread_id());
1834 return invoke_and_get_future<boost::optional<map::data_entry_view>>(request,
1835 key);
1836}
1837
1838boost::future<bool>
1839IMapImpl::evict(const serialization::pimpl::data& key)
1840{
1841 auto request = protocol::codec::map_evict_encode(
1842 get_name(), key, util::get_current_thread_id());
1843 return invoke_and_get_future<bool>(request, key);
1844}
1845
1846boost::future<protocol::ClientMessage>
1847IMapImpl::evict_all()
1848{
1849 auto request = protocol::codec::map_evictall_encode(get_name());
1850 return invoke(request);
1851}
1852
1853boost::future<EntryVector>
1854IMapImpl::get_all_data(int partition_id,
1855 const std::vector<serialization::pimpl::data>& keys)
1856{
1857 auto request = protocol::codec::map_getall_encode(get_name(), keys);
1858 return invoke_and_get_future<EntryVector>(request, partition_id);
1859}
1860
1861boost::future<std::vector<serialization::pimpl::data>>
1862IMapImpl::key_set_data()
1863{
1864 auto request = protocol::codec::map_keyset_encode(get_name());
1865 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1866 request);
1867}
1868
1869boost::future<std::vector<serialization::pimpl::data>>
1870IMapImpl::key_set_data(const serialization::pimpl::data& predicate)
1871{
1872 auto request =
1873 protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1874 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1875 request);
1876}
1877
1878boost::future<
1879 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1880IMapImpl::key_set_for_paging_predicate_data(
1881 protocol::codec::holder::paging_predicate_holder const& predicate)
1882{
1883 auto request = protocol::codec::map_keysetwithpagingpredicate_encode(
1884 get_name(), predicate);
1885 return invoke(request).then(
1886 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1887 return get_paging_predicate_response<
1888 std::vector<serialization::pimpl::data>>(std::move(f));
1889 });
1890}
1891
1892boost::future<EntryVector>
1893IMapImpl::entry_set_data()
1894{
1895 auto request = protocol::codec::map_entryset_encode(get_name());
1896 return invoke_and_get_future<EntryVector>(request);
1897}
1898
1899boost::future<EntryVector>
1900IMapImpl::entry_set_data(const serialization::pimpl::data& predicate)
1901{
1902 auto request =
1903 protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1904 return invoke_and_get_future<EntryVector>(request);
1905}
1906
1907boost::future<std::pair<EntryVector, query::anchor_data_list>>
1908IMapImpl::entry_set_for_paging_predicate_data(
1909 protocol::codec::holder::paging_predicate_holder const& predicate)
1910{
1911 auto request = protocol::codec::map_entrieswithpagingpredicate_encode(
1912 get_name(), predicate);
1913 return invoke(request).then(
1914 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1915 return get_paging_predicate_response<EntryVector>(std::move(f));
1916 });
1917}
1918
1919boost::future<std::vector<serialization::pimpl::data>>
1920IMapImpl::values_data()
1921{
1922 auto request = protocol::codec::map_values_encode(get_name());
1923 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1924 request);
1925}
1926
1927boost::future<std::vector<serialization::pimpl::data>>
1928IMapImpl::values_data(const serialization::pimpl::data& predicate)
1929{
1930 auto request =
1931 protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1932 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1933 request);
1934}
1935
1936boost::future<
1937 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1938IMapImpl::values_for_paging_predicate_data(
1939 protocol::codec::holder::paging_predicate_holder const& predicate)
1940{
1941 auto request = protocol::codec::map_valueswithpagingpredicate_encode(
1942 get_name(), predicate);
1943 return invoke(request).then(
1944 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1945 return get_paging_predicate_response<
1946 std::vector<serialization::pimpl::data>>(std::move(f));
1947 });
1948}
1949
1950boost::future<protocol::ClientMessage>
1951IMapImpl::add_index_data(const config::index_config& config)
1952{
1953 auto request = protocol::codec::map_addindex_encode(get_name(), config);
1954 return invoke(request);
1955}
1956
1957boost::future<int>
1958IMapImpl::size()
1959{
1960 auto request = protocol::codec::map_size_encode(get_name());
1961 return invoke_and_get_future<int>(request);
1962}
1963
1964boost::future<bool>
1965IMapImpl::is_empty()
1966{
1967 auto request = protocol::codec::map_isempty_encode(get_name());
1968 return invoke_and_get_future<bool>(request);
1969}
1970
1971boost::future<protocol::ClientMessage>
1972IMapImpl::put_all_data(int partition_id, const EntryVector& entries)
1973{
1974 auto request =
1975 protocol::codec::map_putall_encode(get_name(), entries, true);
1976 return invoke_on_partition(request, partition_id);
1977}
1978
1979boost::future<protocol::ClientMessage>
1980IMapImpl::clear_data()
1981{
1982 auto request = protocol::codec::map_clear_encode(get_name());
1983 return invoke(request);
1984}
1985
1986boost::future<boost::optional<serialization::pimpl::data>>
1987IMapImpl::execute_on_key_data(const serialization::pimpl::data& key,
1988 const serialization::pimpl::data& processor)
1989{
1990 auto request = protocol::codec::map_executeonkey_encode(
1991 get_name(), processor, key, util::get_current_thread_id());
1992 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1993 request, get_partition_id(key));
1994}
1995
1996boost::future<boost::optional<serialization::pimpl::data>>
1997IMapImpl::submit_to_key_data(const serialization::pimpl::data& key,
1998 const serialization::pimpl::data& processor)
1999{
2000 auto request = protocol::codec::map_submittokey_encode(
2001 get_name(), processor, key, util::get_current_thread_id());
2002 return invoke_on_partition(request, get_partition_id(key))
2003 .then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
2004 auto msg = f.get();
2005 msg.skip_frame();
2006 return msg.get_nullable<serialization::pimpl::data>();
2007 });
2008}
2009
2010boost::future<EntryVector>
2011IMapImpl::execute_on_keys_data(
2012 const std::vector<serialization::pimpl::data>& keys,
2013 const serialization::pimpl::data& processor)
2014{
2015 auto request =
2016 protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
2017 return invoke_and_get_future<EntryVector>(request);
2018}
2019
2020boost::future<protocol::ClientMessage>
2021IMapImpl::remove_interceptor(const std::string& id)
2022{
2023 auto request =
2024 protocol::codec::map_removeinterceptor_encode(get_name(), id);
2025 return invoke(request);
2026}
2027
2028boost::future<EntryVector>
2029IMapImpl::execute_on_entries_data(
2030 const serialization::pimpl::data& entry_processor)
2031{
2032 auto request =
2033 protocol::codec::map_executeonallkeys_encode(get_name(), entry_processor);
2034 return invoke_and_get_future<EntryVector>(request);
2035}
2036
2037boost::future<EntryVector>
2038IMapImpl::execute_on_entries_data(
2039 const serialization::pimpl::data& entry_processor,
2040 const serialization::pimpl::data& predicate)
2041{
2042 auto request = protocol::codec::map_executewithpredicate_encode(
2043 get_name(), entry_processor, predicate);
2044 return invoke_and_get_future<EntryVector>(request);
2045}
2046
2047std::shared_ptr<spi::impl::ListenerMessageCodec>
2048IMapImpl::create_map_entry_listener_codec(
2049 bool include_value,
2050 serialization::pimpl::data&& predicate,
2051 int32_t listener_flags)
2052{
2053 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2054 new MapEntryListenerWithPredicateMessageCodec(
2055 get_name(), include_value, listener_flags, std::move(predicate)));
2056}
2057
2058std::shared_ptr<spi::impl::ListenerMessageCodec>
2059IMapImpl::create_map_entry_listener_codec(bool include_value,
2060 int32_t listener_flags)
2061{
2062 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2063 new MapEntryListenerMessageCodec(
2064 get_name(), include_value, listener_flags));
2065}
2066
2067std::shared_ptr<spi::impl::ListenerMessageCodec>
2068IMapImpl::create_map_entry_listener_codec(bool include_value,
2069 int32_t listener_flags,
2070 serialization::pimpl::data&& key)
2071{
2072 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2073 new MapEntryListenerToKeyCodec(
2074 get_name(), include_value, listener_flags, std::move(key)));
2075}
2076
2077void
2078IMapImpl::on_initialize()
2079{
2080 ProxyImpl::on_initialize();
2081 lock_reference_id_generator_ =
2082 get_context().get_lock_reference_id_generator();
2083}
2084
2085IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(
2086 std::string name,
2087 bool include_value,
2088 int32_t listener_flags)
2089 : name_(std::move(name))
2090 , include_value_(include_value)
2091 , listener_flags_(listener_flags)
2092{}
2093
2094protocol::ClientMessage
2095IMapImpl::MapEntryListenerMessageCodec::encode_add_request(
2096 bool local_only) const
2097{
2098 return protocol::codec::map_addentrylistener_encode(
2099 name_, include_value_, static_cast<int32_t>(listener_flags_), local_only);
2100}
2101
2102protocol::ClientMessage
2103IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(
2104 boost::uuids::uuid real_registration_id) const
2105{
2106 return protocol::codec::map_removeentrylistener_encode(
2107 name_, real_registration_id);
2108}
2109
2110protocol::ClientMessage
2111IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(bool local_only) const
2112{
2113 return protocol::codec::map_addentrylistenertokey_encode(
2114 name_,
2115 key_,
2116 include_value_,
2117 static_cast<int32_t>(listener_flags_),
2118 local_only);
2119}
2120
2121protocol::ClientMessage
2122IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(
2123 boost::uuids::uuid real_registration_id) const
2124{
2125 return protocol::codec::map_removeentrylistener_encode(
2126 name_, real_registration_id);
2127}
2128
2129IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(
2130 std::string name,
2131 bool include_value,
2132 int32_t listener_flags,
2133 serialization::pimpl::data key)
2134 : name_(std::move(name))
2135 , include_value_(include_value)
2136 , listener_flags_(listener_flags)
2137 , key_(std::move(key))
2138{}
2139
2140IMapImpl::MapEntryListenerWithPredicateMessageCodec::
2141 MapEntryListenerWithPredicateMessageCodec(
2142 std::string name,
2143 bool include_value,
2144 int32_t listener_flags,
2145 serialization::pimpl::data&& predicate)
2146 : name_(std::move(name))
2147 , include_value_(include_value)
2148 , listener_flags_(listener_flags)
2149 , predicate_(std::move(predicate))
2150{}
2151
2152protocol::ClientMessage
2153IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(
2154 bool local_only) const
2155{
2156 return protocol::codec::map_addentrylistenerwithpredicate_encode(
2157 name_,
2158 predicate_,
2159 include_value_,
2160 static_cast<int32_t>(listener_flags_),
2161 local_only);
2162}
2163
2164protocol::ClientMessage
2165IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
2166 boost::uuids::uuid real_registration_id) const
2167{
2168 return protocol::codec::map_removeentrylistener_encode(
2169 name_, real_registration_id);
2170}
2171
2172TransactionalQueueImpl::TransactionalQueueImpl(
2173 const std::string& name,
2174 txn::TransactionProxy& transaction_proxy)
2175 : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy)
2176{}
2177
2178boost::future<bool>
2179TransactionalQueueImpl::offer(const serialization::pimpl::data& e,
2180 std::chrono::milliseconds timeout)
2181{
2182 auto request = protocol::codec::transactionalqueue_offer_encode(
2183 get_name(),
2184 get_transaction_id(),
2185 util::get_current_thread_id(),
2186 e,
2187 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2188
2189 return invoke_and_get_future<bool>(request);
2190}
2191
2192boost::future<boost::optional<serialization::pimpl::data>>
2193TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout)
2194{
2195 auto request = protocol::codec::transactionalqueue_poll_encode(
2196 get_name(),
2197 get_transaction_id(),
2198 util::get_current_thread_id(),
2199 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2200
2201 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
2202 request);
2203}
2204
2205boost::future<int>
2206TransactionalQueueImpl::size()
2207{
2208 auto request = protocol::codec::transactionalqueue_size_encode(
2209 get_name(), get_transaction_id(), util::get_current_thread_id());
2210
2211 return invoke_and_get_future<int>(request);
2212}
2213
2214ISetImpl::ISetImpl(const std::string& instance_name,
2215 spi::ClientContext* client_context)
2216 : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context)
2217{
2218 serialization::pimpl::data key_data =
2219 get_context().get_serialization_service().to_data<std::string>(
2220 &instance_name);
2221 partition_id_ = get_partition_id(key_data);
2222}
2223
2224boost::future<bool>
2225ISetImpl::remove_item_listener(boost::uuids::uuid registration_id)
2226{
2227 return get_context().get_client_listener_service().deregister_listener(
2228 registration_id);
2229}
2230
2231boost::future<int>
2232ISetImpl::size()
2233{
2234 auto request = protocol::codec::set_size_encode(get_name());
2235 return invoke_and_get_future<int>(request, partition_id_);
2236}
2237
2238boost::future<bool>
2239ISetImpl::is_empty()
2240{
2241 auto request = protocol::codec::set_isempty_encode(get_name());
2242 return invoke_and_get_future<bool>(request, partition_id_);
2243}
2244
2245boost::future<bool>
2246ISetImpl::contains(const serialization::pimpl::data& element)
2247{
2248 auto request = protocol::codec::set_contains_encode(get_name(), element);
2249 return invoke_and_get_future<bool>(request, partition_id_);
2250}
2251
2252boost::future<std::vector<serialization::pimpl::data>>
2253ISetImpl::to_array_data()
2254{
2255 auto request = protocol::codec::set_getall_encode(get_name());
2256 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
2257 request, partition_id_);
2258}
2259
2260boost::future<bool>
2261ISetImpl::add(const serialization::pimpl::data& element)
2262{
2263 auto request = protocol::codec::set_add_encode(get_name(), element);
2264 return invoke_and_get_future<bool>(request, partition_id_);
2265}
2266
2267boost::future<bool>
2268ISetImpl::remove(const serialization::pimpl::data& element)
2269{
2270 auto request = protocol::codec::set_remove_encode(get_name(), element);
2271 return invoke_and_get_future<bool>(request, partition_id_);
2272}
2273
2274boost::future<bool>
2275ISetImpl::contains_all(const std::vector<serialization::pimpl::data>& elements)
2276{
2277 auto request =
2278 protocol::codec::set_containsall_encode(get_name(), elements);
2279 return invoke_and_get_future<bool>(request, partition_id_);
2280}
2281
2282boost::future<bool>
2283ISetImpl::add_all(const std::vector<serialization::pimpl::data>& elements)
2284{
2285 auto request = protocol::codec::set_addall_encode(get_name(), elements);
2286 return invoke_and_get_future<bool>(request, partition_id_);
2287}
2288
2289boost::future<bool>
2290ISetImpl::remove_all(const std::vector<serialization::pimpl::data>& elements)
2291{
2292 auto request =
2293 protocol::codec::set_compareandremoveall_encode(get_name(), elements);
2294 return invoke_and_get_future<bool>(request, partition_id_);
2295}
2296
2297boost::future<bool>
2298ISetImpl::retain_all(const std::vector<serialization::pimpl::data>& elements)
2299{
2300 auto request =
2301 protocol::codec::set_compareandretainall_encode(get_name(), elements);
2302 return invoke_and_get_future<bool>(request, partition_id_);
2303}
2304
2305boost::future<void>
2306ISetImpl::clear()
2307{
2308 auto request = protocol::codec::set_clear_encode(get_name());
2309 return to_void_future(invoke_on_partition(request, partition_id_));
2310}
2311
2312std::shared_ptr<spi::impl::ListenerMessageCodec>
2313ISetImpl::create_item_listener_codec(bool include_value)
2314{
2315 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2316 new SetListenerMessageCodec(get_name(), include_value));
2317}
2318
2319ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name,
2320 bool include_value)
2321 : name_(std::move(name))
2322 , include_value_(include_value)
2323{}
2324
2325protocol::ClientMessage
2326ISetImpl::SetListenerMessageCodec::encode_add_request(bool local_only) const
2327{
2328 return protocol::codec::set_addlistener_encode(
2329 name_, include_value_, local_only);
2330}
2331
2332protocol::ClientMessage
2333ISetImpl::SetListenerMessageCodec::encode_remove_request(
2334 boost::uuids::uuid real_registration_id) const
2335{
2336 return protocol::codec::set_removelistener_encode(name_,
2337 real_registration_id);
2338}
2339
2340ITopicImpl::ITopicImpl(const std::string& instance_name,
2341 spi::ClientContext* context)
2342 : proxy::ProxyImpl("hz:impl:topicService", instance_name, context)
2343 , partition_id_(get_partition_id(to_data(instance_name)))
2344{}
2345
2346boost::future<void>
2347ITopicImpl::publish(const serialization::pimpl::data& data)
2348{
2349 auto request = protocol::codec::topic_publish_encode(get_name(), data);
2350 return to_void_future(invoke_on_partition(request, partition_id_));
2351}
2352
2353boost::future<boost::uuids::uuid>
2354ITopicImpl::add_message_listener(
2355 std::shared_ptr<impl::BaseEventHandler> topic_event_handler)
2356{
2357 return register_listener(create_item_listener_codec(),
2358 std::move(topic_event_handler));
2359}
2360
2361boost::future<bool>
2362ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id)
2363{
2364 return get_context().get_client_listener_service().deregister_listener(
2365 registration_id);
2366}
2367
2368std::shared_ptr<spi::impl::ListenerMessageCodec>
2369ITopicImpl::create_item_listener_codec()
2370{
2371 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2372 new TopicListenerMessageCodec(get_name()));
2373}
2374
2375ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(
2376 std::string name)
2377 : name_(std::move(name))
2378{}
2379
2380protocol::ClientMessage
2381ITopicImpl::TopicListenerMessageCodec::encode_add_request(bool local_only) const
2382{
2383 return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
2384}
2385
2386protocol::ClientMessage
2387ITopicImpl::TopicListenerMessageCodec::encode_remove_request(
2388 boost::uuids::uuid real_registration_id) const
2389{
2390 return protocol::codec::topic_removemessagelistener_encode(
2391 name_, real_registration_id);
2392}
2393
2394ReplicatedMapImpl::ReplicatedMapImpl(const std::string& service_name,
2395 const std::string& object_name,
2396 spi::ClientContext* context)
2397 : ProxyImpl(service_name, object_name, context)
2398 , target_partition_id_(-1)
2399{}
2400
2401const int32_t RingbufferImpl::MAX_BATCH_SIZE{ 1000 };
2402} // namespace proxy
2403
2404namespace map {
2405const serialization::pimpl::data&
2406data_entry_view::get_key() const
2407{
2408 return key_;
2409}
2410
2411const serialization::pimpl::data&
2412data_entry_view::get_value() const
2413{
2414 return value_;
2415}
2416
2417int64_t
2418data_entry_view::get_cost() const
2419{
2420 return cost_;
2421}
2422
2423int64_t
2424data_entry_view::get_creation_time() const
2425{
2426 return creation_time_;
2427}
2428
2429int64_t
2430data_entry_view::get_expiration_time() const
2431{
2432 return expiration_time_;
2433}
2434
2435int64_t
2436data_entry_view::get_hits() const
2437{
2438 return hits_;
2439}
2440
2441int64_t
2442data_entry_view::get_last_access_time() const
2443{
2444 return last_access_time_;
2445}
2446
2447int64_t
2448data_entry_view::get_last_stored_time() const
2449{
2450 return last_stored_time_;
2451}
2452
2453int64_t
2454data_entry_view::get_last_update_time() const
2455{
2456 return last_update_time_;
2457}
2458
2459int64_t
2460data_entry_view::get_version() const
2461{
2462 return version_;
2463}
2464
2465int64_t
2466data_entry_view::get_ttl() const
2467{
2468 return ttl_;
2469}
2470
2471int64_t
2472data_entry_view::get_max_idle() const
2473{
2474 return max_idle_;
2475}
2476
2477data_entry_view::data_entry_view(serialization::pimpl::data&& key,
2478 serialization::pimpl::data&& value,
2479 int64_t cost,
2480 int64_t creation_time,
2481 int64_t expiration_time,
2482 int64_t hits,
2483 int64_t last_access_time,
2484 int64_t last_stored_time,
2485 int64_t last_update_time,
2486 int64_t version,
2487 int64_t ttl,
2488 int64_t max_idle)
2489 : key_(std::move(key))
2490 , value_(std::move(value))
2491 , cost_(cost)
2492 , creation_time_(creation_time)
2493 , expiration_time_(expiration_time)
2494 , hits_(hits)
2495 , last_access_time_(last_access_time)
2496 , last_stored_time_(last_stored_time)
2497 , last_update_time_(last_update_time)
2498 , version_(version)
2499 , ttl_(ttl)
2500 , max_idle_(max_idle)
2501{}
2502} // namespace map
2503
2504namespace topic {
2505namespace impl {
2506namespace reliable {
2507ReliableTopicMessage::ReliableTopicMessage()
2508 : publish_time_(std::chrono::system_clock::now())
2509{}
2510
2511ReliableTopicMessage::ReliableTopicMessage(
2512 hazelcast::client::serialization::pimpl::data&& payload_data,
2513 std::unique_ptr<address> address)
2514 : publish_time_(std::chrono::system_clock::now())
2515 , payload_(std::move(payload_data))
2516{
2517 if (address) {
2518 publisher_address_ = boost::make_optional(*address);
2519 }
2520}
2521
2522std::chrono::system_clock::time_point
2523ReliableTopicMessage::get_publish_time() const
2524{
2525 return publish_time_;
2526}
2527
2528const boost::optional<address>&
2529ReliableTopicMessage::get_publisher_address() const
2530{
2531 return publisher_address_;
2532}
2533
2534serialization::pimpl::data&
2535ReliableTopicMessage::get_payload()
2536{
2537 return payload_;
2538}
2539} // namespace reliable
2540} // namespace impl
2541} // namespace topic
2542
2543namespace serialization {
2544int32_t
2545hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id()
2546{
2547 return F_ID;
2548}
2549
2550int
2551hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id()
2552{
2553 return RELIABLE_TOPIC_MESSAGE;
2554}
2555
2556void
2557hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
2558 const topic::impl::reliable::ReliableTopicMessage& object,
2559 object_data_output& out)
2560{
2561 out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
2562 object.publish_time_.time_since_epoch())
2563 .count());
2564 out.write_object(object.publisher_address_);
2565 out.write(object.payload_.to_byte_array());
2566}
2567
2568topic::impl::reliable::ReliableTopicMessage
2569hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(
2570 object_data_input& in)
2571{
2572 topic::impl::reliable::ReliableTopicMessage message;
2573 auto now = std::chrono::system_clock::now();
2574 message.publish_time_ = now +
2575 std::chrono::milliseconds(in.read<int64_t>()) -
2576 now.time_since_epoch();
2577 message.publisher_address_ = in.read_object<address>();
2578 message.payload_ =
2579 serialization::pimpl::data(in.read<std::vector<byte>>().value());
2580 return message;
2581}
2582} // namespace serialization
2583
2584entry_event::entry_event(const std::string& name,
2585 member&& member,
2586 type event_type,
2587 typed_data&& key,
2588 typed_data&& value,
2589 typed_data&& old_value,
2590 typed_data&& merging_value)
2591 : name_(name)
2592 , member_(std::move(member))
2593 , event_type_(event_type)
2594 , key_(std::move(key))
2595 , value_(std::move(value))
2596 , old_value_(std::move(old_value))
2597 , merging_value_(std::move(merging_value))
2598{}
2599
2600const typed_data&
2602{
2603 return key_;
2604}
2605
2606const typed_data&
2608{
2609 return old_value_;
2610}
2611
2612const typed_data&
2614{
2615 return value_;
2616}
2617
2618const typed_data&
2620{
2621 return merging_value_;
2622}
2623
2624const member&
2626{
2627 return member_;
2628}
2629
2630entry_event::type
2632{
2633 return event_type_;
2634}
2635
2636const std::string&
2638{
2639 return name_;
2640}
2641
2642std::ostream&
2643operator<<(std::ostream& os, const entry_event& event)
2644{
2645 os << "name: " << event.name_ << " member: " << event.member_
2646 << " eventType: " << static_cast<int>(event.event_type_)
2647 << " key: " << event.key_.get_type()
2648 << " value: " << event.value_.get_type()
2649 << " oldValue: " << event.old_value_.get_type()
2650 << " mergingValue: " << event.merging_value_.get_type();
2651 return os;
2652}
2653
2655 entry_event::type event_type,
2656 const std::string& name,
2657 int number_of_entries_affected)
2658 : member_(member)
2659 , event_type_(event_type)
2660 , name_(name)
2661 , number_of_entries_affected_(number_of_entries_affected)
2662{}
2663
2664const member&
2666{
2667 return member_;
2668}
2669
2670entry_event::type
2672{
2673 return event_type_;
2674}
2675
2676const std::string&
2678{
2679 return name_;
2680}
2681
2682int
2684{
2685 return number_of_entries_affected_;
2686}
2687
2688std::ostream&
2689operator<<(std::ostream& os, const map_event& event)
2690{
2691 os << "MapEvent{member: " << event.member_
2692 << " eventType: " << static_cast<int>(event.event_type_)
2693 << " name: " << event.name_
2694 << " numberOfEntriesAffected: " << event.number_of_entries_affected_;
2695 return os;
2696}
2697
2698item_event_base::item_event_base(const std::string& name,
2699 const member& member,
2700 const item_event_type& event_type)
2701 : name_(name)
2702 , member_(member)
2703 , event_type_(event_type)
2704{}
2705
2706const member&
2708{
2709 return member_;
2710}
2711
2712item_event_type
2714{
2715 return event_type_;
2716}
2717
2718const std::string&
2720{
2721 return name_;
2722}
2723
2724item_event_base::~item_event_base() = default;
2725
2726flake_id_generator::flake_id_generator(const std::string& object_name,
2727 spi::ClientContext* context)
2728 : flake_id_generator_impl(SERVICE_NAME, object_name, context)
2729{}
2730} // namespace client
2731} // namespace hazelcast
const typed_data & get_key() const
Returns the key of the entry event.
Definition proxy.cpp:2601
const std::string & get_name() const
Returns the name of the map for this event.
Definition proxy.cpp:2637
const typed_data & get_old_value() const
Returns the old value of the entry event.
Definition proxy.cpp:2607
type get_event_type() const
Return the event type.
Definition proxy.cpp:2631
const member & get_member() const
Returns the member fired this event.
Definition proxy.cpp:2625
const typed_data & get_value() const
Returns the value of the entry event.
Definition proxy.cpp:2613
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
Definition proxy.cpp:2619
static const std::chrono::milliseconds UNSET
Default TTL value of a record.
Definition imap.h:1136
item_event_type get_event_type() const
Return the event type.
Definition proxy.cpp:2713
const member & get_member() const
Returns the member fired this event.
Definition proxy.cpp:2707
const std::string & get_name() const
Returns the name of the collection for this event.
Definition proxy.cpp:2719
Map events common contract.
Definition map_event.h:36
const std::string & get_name() const
Returns the name of the map for this event.
Definition proxy.cpp:2677
entry_event::type get_event_type() const
Return the event type.
Definition proxy.cpp:2671
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
Definition proxy.cpp:2683
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
Definition proxy.cpp:2654
const member & get_member() const
Returns the member fired this event.
Definition proxy.cpp:2665
hz_cluster member class.
Definition member.h:62
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
bool remove_message_listener(const std::string &registration_id)
Stops receiving messages for the given message listener.
Definition proxy.cpp:66
reliable_listener(bool loss_tolerant, int64_t initial_sequence_id=-1)
Definition proxy.cpp:103
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const
STL namespace.