Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
proxy.cpp
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <unordered_set>
18#include <atomic>
19
20#include "hazelcast/client/impl/ClientLockReferenceIdGenerator.h"
21#include "hazelcast/client/proxy/PNCounterImpl.h"
22#include "hazelcast/client/spi/ClientContext.h"
23#include "hazelcast/client/impl/hazelcast_client_instance_impl.h"
24#include "hazelcast/client/proxy/flake_id_generator_impl.h"
25#include "hazelcast/client/spi/impl/listener/listener_service_impl.h"
26#include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
27#include "hazelcast/client/client_config.h"
28#include "hazelcast/client/map/data_entry_view.h"
29#include "hazelcast/client/proxy/RingbufferImpl.h"
30#include "hazelcast/client/impl/vector_clock.h"
31#include "hazelcast/client/internal/partition/strategy/StringPartitioningStrategy.h"
32#include "hazelcast/util/Util.h"
33#include "hazelcast/client/topic/reliable_listener.h"
34#include "hazelcast/client/proxy/ITopicImpl.h"
35#include "hazelcast/client/proxy/ReplicatedMapImpl.h"
36#include "hazelcast/client/flake_id_generator.h"
37#include "hazelcast/client/reliable_topic.h"
38
39namespace hazelcast {
40namespace client {
41const std::chrono::milliseconds imap::UNSET{ -1 };
42
43reliable_topic::reliable_topic(const std::string& instance_name,
44 spi::ClientContext* context)
45 : proxy::ProxyImpl(reliable_topic::SERVICE_NAME, instance_name, context)
46 , execution_service_(
47 context->get_client_execution_service().shared_from_this())
48 , executor_(execution_service_->get_user_executor())
49 , logger_(context->get_logger())
50{
51 auto reliable_config =
52 context->get_client_config().lookup_reliable_topic_config(instance_name);
53 if (reliable_config) {
54 batch_size_ = reliable_config->get_read_batch_size();
55 } else {
56 batch_size_ = config::reliable_topic_config::DEFAULT_READ_BATCH_SIZE;
57 }
58
59 ringbuffer_ =
60 context->get_hazelcast_client_implementation()
61 ->get_distributed_object<ringbuffer>(
62 std::string(reliable_topic::TOPIC_RB_PREFIX) + instance_name);
63}
64
65bool
66reliable_topic::remove_message_listener(const std::string& registration_id)
67{
68 int id = util::IOUtil::to_value<int>(registration_id);
69 auto runner = runners_map_.get(id);
70 if (!runner) {
71 return false;
72 }
73 runner->cancel();
74 return true;
75}
76
77void
78reliable_topic::on_shutdown()
79{
80 // cancel all runners
81 for (auto& entry : runners_map_.clear()) {
82 entry.second->cancel();
83 }
84}
85
86void
87reliable_topic::on_destroy()
88{
89 // cancel all runners
90 for (auto& entry : runners_map_.clear()) {
91 entry.second->cancel();
92 }
93}
94
95void
96reliable_topic::post_destroy()
97{
98 // destroy the underlying ringbuffer
99 ringbuffer_.get()->destroy().get();
100}
101
102namespace topic {
104 int64_t initial_sequence_id)
105 : loss_tolerant_(loss_tolerant)
106 , initial_sequence_id_(initial_sequence_id)
107{}
108} // namespace topic
109
110namespace impl {
111ClientLockReferenceIdGenerator::ClientLockReferenceIdGenerator()
112 : reference_id_counter_(0)
113{}
114
115int64_t
116ClientLockReferenceIdGenerator::get_next_reference_id()
117{
118 return ++reference_id_counter_;
119}
120} // namespace impl
121
122namespace proxy {
123MultiMapImpl::MultiMapImpl(const std::string& instance_name,
124 spi::ClientContext* context)
125 : ProxyImpl(multi_map::SERVICE_NAME, instance_name, context)
126{
127 // TODO: remove this line once the client instance get_distributed_object
128 // works as expected in Java for this proxy type
129 lock_reference_id_generator_ =
130 get_context().get_lock_reference_id_generator();
131}
132
133boost::future<bool>
134MultiMapImpl::put(const serialization::pimpl::data& key,
135 const serialization::pimpl::data& value)
136{
137 auto request = protocol::codec::multimap_put_encode(
138 get_name(), key, value, util::get_current_thread_id());
139 return invoke_and_get_future<bool>(request, key);
140}
141
142boost::future<std::vector<serialization::pimpl::data>>
143MultiMapImpl::get_data(const serialization::pimpl::data& key)
144{
145 auto request = protocol::codec::multimap_get_encode(
146 get_name(), key, util::get_current_thread_id());
147 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
148 request, key);
149}
150
151boost::future<bool>
152MultiMapImpl::remove(const serialization::pimpl::data& key,
153 const serialization::pimpl::data& value)
154{
155 auto request = protocol::codec::multimap_removeentry_encode(
156 get_name(), key, value, util::get_current_thread_id());
157 return invoke_and_get_future<bool>(request, key);
158}
159
160boost::future<std::vector<serialization::pimpl::data>>
161MultiMapImpl::remove_data(const serialization::pimpl::data& key)
162{
163 auto request = protocol::codec::multimap_remove_encode(
164 get_name(), key, util::get_current_thread_id());
165 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
166 request, key);
167}
168
169boost::future<std::vector<serialization::pimpl::data>>
170MultiMapImpl::key_set_data()
171{
172 auto request = protocol::codec::multimap_keyset_encode(get_name());
173 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
174 request);
175}
176
177boost::future<std::vector<serialization::pimpl::data>>
178MultiMapImpl::values_data()
179{
180 auto request = protocol::codec::multimap_values_encode(get_name());
181 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
182 request);
183}
184
185boost::future<EntryVector>
186MultiMapImpl::entry_set_data()
187{
188 auto request = protocol::codec::multimap_entryset_encode(get_name());
189 return invoke_and_get_future<EntryVector>(request);
190}
191
192boost::future<bool>
193MultiMapImpl::contains_key(const serialization::pimpl::data& key)
194{
195 auto request = protocol::codec::multimap_containskey_encode(
196 get_name(), key, util::get_current_thread_id());
197 return invoke_and_get_future<bool>(request, key);
198}
199
200boost::future<bool>
201MultiMapImpl::contains_value(const serialization::pimpl::data& value)
202{
203 auto request =
204 protocol::codec::multimap_containsvalue_encode(get_name(), value);
205 return invoke_and_get_future<bool>(request);
206}
207
208boost::future<bool>
209MultiMapImpl::contains_entry(const serialization::pimpl::data& key,
210 const serialization::pimpl::data& value)
211{
212 auto request = protocol::codec::multimap_containsentry_encode(
213 get_name(), key, value, util::get_current_thread_id());
214 return invoke_and_get_future<bool>(request, key);
215}
216
217boost::future<int>
218MultiMapImpl::size()
219{
220 auto request = protocol::codec::multimap_size_encode(get_name());
221 return invoke_and_get_future<int>(request);
222}
223
224boost::future<void>
225MultiMapImpl::clear()
226{
227 auto request = protocol::codec::multimap_clear_encode(get_name());
228 return to_void_future(invoke(request));
229}
230
231boost::future<int>
232MultiMapImpl::value_count(const serialization::pimpl::data& key)
233{
234 auto request = protocol::codec::multimap_valuecount_encode(
235 get_name(), key, util::get_current_thread_id());
236 return invoke_and_get_future<int>(request, key);
237}
238
239boost::future<boost::uuids::uuid>
240MultiMapImpl::add_entry_listener(
241 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
242 bool include_value)
243{
244 return register_listener(
245 create_multi_map_entry_listener_codec(include_value),
246 std::move(entry_event_handler));
247}
248
249boost::future<boost::uuids::uuid>
250MultiMapImpl::add_entry_listener(
251 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
252 bool include_value,
253 serialization::pimpl::data&& key)
254{
255 return register_listener(
256 create_multi_map_entry_listener_codec(include_value, std::move(key)),
257 std::move(entry_event_handler));
258}
259
260boost::future<bool>
261MultiMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
262{
263 return get_context().get_client_listener_service().deregister_listener(
264 registration_id);
265}
266
267boost::future<void>
268MultiMapImpl::lock(const serialization::pimpl::data& key)
269{
270 return lock(key, std::chrono::milliseconds(-1));
271}
272
273boost::future<void>
274MultiMapImpl::lock(const serialization::pimpl::data& key,
275 std::chrono::milliseconds lease_time)
276{
277 auto request = protocol::codec::multimap_lock_encode(
278 get_name(),
279 key,
280 util::get_current_thread_id(),
281 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
282 lock_reference_id_generator_->get_next_reference_id());
283 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
284}
285
286boost::future<bool>
287MultiMapImpl::is_locked(const serialization::pimpl::data& key)
288{
289 auto request = protocol::codec::multimap_islocked_encode(get_name(), key);
290 return invoke_and_get_future<bool>(request, key);
291}
292
293boost::future<bool>
294MultiMapImpl::try_lock(const serialization::pimpl::data& key)
295{
296 auto request = protocol::codec::multimap_trylock_encode(
297 get_name(),
298 key,
299 util::get_current_thread_id(),
300 INT64_MAX,
301 0,
302 lock_reference_id_generator_->get_next_reference_id());
303 return invoke_and_get_future<bool>(request, key);
304}
305
306boost::future<bool>
307MultiMapImpl::try_lock(const serialization::pimpl::data& key,
308 std::chrono::milliseconds timeout)
309{
310 return try_lock(key, timeout, std::chrono::milliseconds(INT64_MAX));
311}
312
313boost::future<bool>
314MultiMapImpl::try_lock(const serialization::pimpl::data& key,
315 std::chrono::milliseconds timeout,
316 std::chrono::milliseconds lease_time)
317{
318 auto request = protocol::codec::multimap_trylock_encode(
319 get_name(),
320 key,
321 util::get_current_thread_id(),
322 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
323 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
324 lock_reference_id_generator_->get_next_reference_id());
325 return invoke_and_get_future<bool>(request, key);
326}
327
328boost::future<void>
329MultiMapImpl::unlock(const serialization::pimpl::data& key)
330{
331 auto request = protocol::codec::multimap_unlock_encode(
332 get_name(),
333 key,
334 util::get_current_thread_id(),
335 lock_reference_id_generator_->get_next_reference_id());
336 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
337}
338
339boost::future<void>
340MultiMapImpl::force_unlock(const serialization::pimpl::data& key)
341{
342 auto request = protocol::codec::multimap_forceunlock_encode(
343 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
344 return to_void_future(invoke_on_partition(request, get_partition_id(key)));
345}
346
347std::shared_ptr<spi::impl::ListenerMessageCodec>
348MultiMapImpl::create_multi_map_entry_listener_codec(bool include_value)
349{
350 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
351 new MultiMapEntryListenerMessageCodec(get_name(), include_value));
352}
353
354std::shared_ptr<spi::impl::ListenerMessageCodec>
355MultiMapImpl::create_multi_map_entry_listener_codec(
356 bool include_value,
357 serialization::pimpl::data&& key)
358{
359 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
360 new MultiMapEntryListenerToKeyCodec(
361 get_name(), include_value, std::move(key)));
362}
363
364void
365MultiMapImpl::on_initialize()
366{
367 ProxyImpl::on_initialize();
368 lock_reference_id_generator_ =
369 get_context().get_lock_reference_id_generator();
370}
371
372MultiMapImpl::MultiMapEntryListenerMessageCodec::
373 MultiMapEntryListenerMessageCodec(std::string name, bool include_value)
374 : name_(std::move(name))
375 , include_value_(include_value)
376{}
377
378protocol::ClientMessage
379MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_add_request(
380 bool local_only) const
381{
382 return protocol::codec::multimap_addentrylistener_encode(
383 name_, include_value_, local_only);
384}
385
386protocol::ClientMessage
387MultiMapImpl::MultiMapEntryListenerMessageCodec::encode_remove_request(
388 boost::uuids::uuid real_registration_id) const
389{
390 return protocol::codec::multimap_removeentrylistener_encode(
391 name_, real_registration_id);
392}
393
394protocol::ClientMessage
395MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_add_request(
396 bool local_only) const
397{
398 return protocol::codec::multimap_addentrylistenertokey_encode(
399 name_, key_, include_value_, local_only);
400}
401
402protocol::ClientMessage
403MultiMapImpl::MultiMapEntryListenerToKeyCodec::encode_remove_request(
404 boost::uuids::uuid real_registration_id) const
405{
406 return protocol::codec::multimap_removeentrylistener_encode(
407 name_, real_registration_id);
408}
409
410MultiMapImpl::MultiMapEntryListenerToKeyCodec::MultiMapEntryListenerToKeyCodec(
411 std::string name,
412 bool include_value,
413 serialization::pimpl::data&& key)
414 : name_(std::move(name))
415 , include_value_(include_value)
416 , key_(std::move(key))
417{}
418
419const std::shared_ptr<std::unordered_set<member>>
420 PNCounterImpl::EMPTY_ADDRESS_LIST(new std::unordered_set<member>());
421
422PNCounterImpl::PNCounterImpl(const std::string& service_name,
423 const std::string& object_name,
424 spi::ClientContext* context)
425 : ProxyImpl(service_name, object_name, context)
426 , max_configured_replica_count_(0)
427 , observed_clock_(
428 std::shared_ptr<impl::vector_clock>(new impl::vector_clock()))
429 , logger_(context->get_logger())
430{}
431
432std::ostream&
433operator<<(std::ostream& os, const PNCounterImpl& proxy)
434{
435 os << "PNCounter{name='" << proxy.get_name() << "\'}";
436 return os;
437}
438
439boost::future<int64_t>
440PNCounterImpl::get()
441{
442 boost::shared_ptr<member> target =
443 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
444 if (!target) {
445 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
446 "ClientPNCounterProxy::get",
447 "Cannot invoke operations on a CRDT because the cluster does not "
448 "contain any data members"));
449 }
450 return invoke_get_internal(EMPTY_ADDRESS_LIST, nullptr, target);
451}
452
453boost::future<int64_t>
454PNCounterImpl::get_and_add(int64_t delta)
455{
456 boost::shared_ptr<member> target =
457 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
458 if (!target) {
459 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
460 "ClientPNCounterProxy::getAndAdd",
461 "Cannot invoke operations on a CRDT because the cluster does not "
462 "contain any data members"));
463 }
464 return invoke_add_internal(
465 delta, true, EMPTY_ADDRESS_LIST, nullptr, target);
466}
467
468boost::future<int64_t>
469PNCounterImpl::add_and_get(int64_t delta)
470{
471 boost::shared_ptr<member> target =
472 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
473 if (!target) {
474 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
475 "ClientPNCounterProxy::addAndGet",
476 "Cannot invoke operations on a CRDT because the cluster does not "
477 "contain any data members"));
478 }
479 return invoke_add_internal(
480 delta, false, EMPTY_ADDRESS_LIST, nullptr, target);
481}
482
483boost::future<int64_t>
484PNCounterImpl::get_and_subtract(int64_t delta)
485{
486 boost::shared_ptr<member> target =
487 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
488 if (!target) {
489 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
490 "ClientPNCounterProxy::getAndSubtract",
491 "Cannot invoke operations on a CRDT because the cluster does not "
492 "contain any data members"));
493 }
494 return invoke_add_internal(
495 -delta, true, EMPTY_ADDRESS_LIST, nullptr, target);
496}
497
498boost::future<int64_t>
499PNCounterImpl::subtract_and_get(int64_t delta)
500{
501 boost::shared_ptr<member> target =
502 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
503 if (!target) {
504 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
505 "ClientPNCounterProxy::subtractAndGet",
506 "Cannot invoke operations on a CRDT because the cluster does not "
507 "contain any data members"));
508 }
509 return invoke_add_internal(
510 -delta, false, EMPTY_ADDRESS_LIST, nullptr, target);
511}
512
513boost::future<int64_t>
514PNCounterImpl::decrement_and_get()
515{
516 boost::shared_ptr<member> target =
517 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
518 if (!target) {
519 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
520 "ClientPNCounterProxy::decrementAndGet",
521 "Cannot invoke operations on a CRDT because the cluster does not "
522 "contain any data members"));
523 }
524 return invoke_add_internal(-1, false, EMPTY_ADDRESS_LIST, nullptr, target);
525}
526
527boost::future<int64_t>
528PNCounterImpl::increment_and_get()
529{
530 boost::shared_ptr<member> target =
531 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
532 if (!target) {
533 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
534 "ClientPNCounterProxy::incrementAndGet",
535 "Cannot invoke operations on a CRDT because the cluster does not "
536 "contain any data members"));
537 }
538 return invoke_add_internal(1, false, EMPTY_ADDRESS_LIST, nullptr, target);
539}
540
541boost::future<int64_t>
542PNCounterImpl::get_and_decrement()
543{
544 boost::shared_ptr<member> target =
545 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
546 if (!target) {
547 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
548 "ClientPNCounterProxy::getAndDecrement",
549 "Cannot invoke operations on a CRDT because the cluster does not "
550 "contain any data members"));
551 }
552 return invoke_add_internal(-1, true, EMPTY_ADDRESS_LIST, nullptr, target);
553}
554
555boost::future<int64_t>
556PNCounterImpl::get_and_increment()
557{
558 boost::shared_ptr<member> target =
559 get_crdt_operation_target(*EMPTY_ADDRESS_LIST);
560 if (!target) {
561 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
562 "ClientPNCounterProxy::getAndIncrement",
563 "Cannot invoke operations on a CRDT because the cluster does not "
564 "contain any data members"));
565 }
566 return invoke_add_internal(1, true, EMPTY_ADDRESS_LIST, nullptr, target);
567}
568
569boost::future<void>
570PNCounterImpl::reset()
571{
572 observed_clock_ =
573 std::shared_ptr<impl::vector_clock>(new impl::vector_clock());
574 return boost::make_ready_future();
575}
576
577boost::shared_ptr<member>
578PNCounterImpl::get_crdt_operation_target(
579 const std::unordered_set<member>& excluded_addresses)
580{
581 auto replicaAddress = current_target_replica_address_.load();
582 if (replicaAddress &&
583 excluded_addresses.find(*replicaAddress) == excluded_addresses.end()) {
584 return replicaAddress;
585 }
586
587 {
588 std::lock_guard<std::mutex> guard(target_selection_mutex_);
589 replicaAddress = current_target_replica_address_.load();
590 if (!replicaAddress || excluded_addresses.find(*replicaAddress) !=
591 excluded_addresses.end()) {
592 current_target_replica_address_ =
593 choose_target_replica(excluded_addresses);
594 }
595 }
596 return current_target_replica_address_;
597}
598
599boost::shared_ptr<member>
600PNCounterImpl::choose_target_replica(
601 const std::unordered_set<member>& excluded_addresses)
602{
603 std::vector<member> replicaAddresses =
604 get_replica_addresses(excluded_addresses);
605 if (replicaAddresses.empty()) {
606 return nullptr;
607 }
608 // TODO: Use a random generator as used in Java (ThreadLocalRandomProvider)
609 // which is per thread
610 int randomReplicaIndex = std::abs(rand()) % (int)replicaAddresses.size();
611 return boost::make_shared<member>(replicaAddresses[randomReplicaIndex]);
612}
613
614std::vector<member>
615PNCounterImpl::get_replica_addresses(
616 const std::unordered_set<member>& excluded_members)
617{
618 std::vector<member> dataMembers =
619 get_context().get_client_cluster_service().get_members(
620 *member_selectors::DATA_MEMBER_SELECTOR);
621 int32_t replicaCount = get_max_configured_replica_count();
622 int currentReplicaCount =
623 util::min<int>(replicaCount, (int)dataMembers.size());
624
625 std::vector<member> replicaMembers;
626 for (int i = 0; i < currentReplicaCount; i++) {
627 if (excluded_members.find(dataMembers[i]) == excluded_members.end()) {
628 replicaMembers.push_back(dataMembers[i]);
629 }
630 }
631 return replicaMembers;
632}
633
634int32_t
635PNCounterImpl::get_max_configured_replica_count()
636{
637 if (max_configured_replica_count_ > 0) {
638 return max_configured_replica_count_;
639 } else {
640 auto request =
641 protocol::codec::pncounter_getconfiguredreplicacount_encode(
642 get_name());
643 max_configured_replica_count_ =
644 invoke_and_get_future<int32_t>(request).get();
645 }
646 return max_configured_replica_count_;
647}
648
649boost::shared_ptr<member>
650PNCounterImpl::try_choose_a_new_target(
651 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
652 boost::shared_ptr<member> last_target,
653 const exception::hazelcast_& last_exception)
654{
655 HZ_LOG(
656 logger_,
657 finest,
658 boost::str(boost::format(
659 "Exception occurred while invoking operation on target %1%, "
660 "choosing different target. Cause: %2%") %
661 last_target % last_exception));
662 if (excluded_addresses == EMPTY_ADDRESS_LIST) {
663 // TODO: Make sure that this only affects the local variable of the
664 // method
665 excluded_addresses = std::make_shared<std::unordered_set<member>>();
666 }
667 excluded_addresses->insert(*last_target);
668 return get_crdt_operation_target(*excluded_addresses);
669}
670
671boost::future<int64_t>
672PNCounterImpl::invoke_get_internal(
673 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
674 std::exception_ptr last_exception,
675 const boost::shared_ptr<member>& target)
676{
677 if (!target) {
678 if (last_exception) {
679 std::rethrow_exception(last_exception);
680 } else {
681 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
682 "ClientPNCounterProxy::invokeGetInternal",
683 "Cannot invoke operations on a CRDT because the cluster does not "
684 "contain any data members"));
685 }
686 }
687 try {
688 auto timestamps = observed_clock_.get()->entry_set();
689 auto request = protocol::codec::pncounter_get_encode(
690 get_name(), timestamps, target->get_uuid());
691 return invoke_on_member(request, target->get_uuid())
692 .then(
693 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
694 try {
695 return get_and_update_timestamps(std::move(f));
696 } catch (exception::hazelcast_& e) {
697 return invoke_get_internal(excluded_addresses,
698 std::current_exception(),
699 try_choose_a_new_target(
700 excluded_addresses, target, e))
701 .get();
702 }
703 });
704 } catch (exception::hazelcast_& e) {
705 return invoke_get_internal(
706 excluded_addresses,
707 std::current_exception(),
708 try_choose_a_new_target(excluded_addresses, target, e));
709 }
710}
711
712boost::future<int64_t>
713PNCounterImpl::invoke_add_internal(
714 int64_t delta,
715 bool getBeforeUpdate,
716 std::shared_ptr<std::unordered_set<member>> excluded_addresses,
717 std::exception_ptr last_exception,
718 const boost::shared_ptr<member>& target)
719{
720 if (!target) {
721 if (last_exception) {
722 std::rethrow_exception(last_exception);
723 } else {
724 BOOST_THROW_EXCEPTION(exception::no_data_member_in_cluster(
725 "ClientPNCounterProxy::invokeGetInternal",
726 "Cannot invoke operations on a CRDT because the cluster does not "
727 "contain any data members"));
728 }
729 }
730
731 try {
732 auto request = protocol::codec::pncounter_add_encode(
733 get_name(),
734 delta,
735 getBeforeUpdate,
736 observed_clock_.get()->entry_set(),
737 target->get_uuid());
738 return invoke_on_member(request, target->get_uuid())
739 .then(
740 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
741 try {
742 return get_and_update_timestamps(std::move(f));
743 } catch (exception::hazelcast_& e) {
744 return invoke_add_internal(delta,
745 getBeforeUpdate,
746 excluded_addresses,
747 std::current_exception(),
748 try_choose_a_new_target(
749 excluded_addresses, target, e))
750 .get();
751 }
752 });
753 } catch (exception::hazelcast_& e) {
754 return invoke_add_internal(
755 delta,
756 getBeforeUpdate,
757 excluded_addresses,
758 std::current_exception(),
759 try_choose_a_new_target(excluded_addresses, target, e));
760 }
761}
762
763int64_t
764PNCounterImpl::get_and_update_timestamps(
765 boost::future<protocol::ClientMessage> f)
766{
767 auto msg = f.get();
768 auto value = msg.get_first_fixed_sized_field<int64_t>();
769 // skip replica count
770 msg.get<int32_t>();
771 update_observed_replica_timestamps(
772 msg.get<impl::vector_clock::timestamp_vector>());
773 return value;
774}
775
776void
777PNCounterImpl::update_observed_replica_timestamps(
778 const impl::vector_clock::timestamp_vector& received_logical_timestamps)
779{
780 std::shared_ptr<impl::vector_clock> received =
781 to_vector_clock(received_logical_timestamps);
782 for (;;) {
783 std::shared_ptr<impl::vector_clock> currentClock =
784 this->observed_clock_;
785 if (currentClock->is_after(*received)) {
786 break;
787 }
788 if (observed_clock_.compare_and_set(currentClock, received)) {
789 break;
790 }
791 }
792}
793
794std::shared_ptr<impl::vector_clock>
795PNCounterImpl::to_vector_clock(
796 const impl::vector_clock::timestamp_vector& replica_logical_timestamps)
797{
798 return std::shared_ptr<impl::vector_clock>(
799 new impl::vector_clock(replica_logical_timestamps));
800}
801
802boost::shared_ptr<member>
803PNCounterImpl::get_current_target_replica_address()
804{
805 return current_target_replica_address_.load();
806}
807
808IListImpl::IListImpl(const std::string& instance_name,
809 spi::ClientContext* context)
810 : ProxyImpl("hz:impl:listService", instance_name, context)
811{
812 serialization::pimpl::data key_data =
813 get_context().get_serialization_service().to_data<std::string>(
814 &instance_name);
815 partition_id_ = get_partition_id(key_data);
816}
817
818boost::future<bool>
819IListImpl::remove_item_listener(boost::uuids::uuid registration_id)
820{
821 return get_context().get_client_listener_service().deregister_listener(
822 registration_id);
823}
824
825boost::future<int>
826IListImpl::size()
827{
828 auto request = protocol::codec::list_size_encode(get_name());
829 return invoke_and_get_future<int>(request, partition_id_);
830}
831
832boost::future<bool>
833IListImpl::is_empty()
834{
835 auto request = protocol::codec::list_isempty_encode(get_name());
836 return invoke_and_get_future<bool>(request, partition_id_);
837}
838
839boost::future<bool>
840IListImpl::contains(const serialization::pimpl::data& element)
841{
842 auto request = protocol::codec::list_contains_encode(get_name(), element);
843 return invoke_and_get_future<bool>(request, partition_id_);
844}
845
846boost::future<std::vector<serialization::pimpl::data>>
847IListImpl::to_array_data()
848{
849 auto request = protocol::codec::list_getall_encode(get_name());
850 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
851 request, partition_id_);
852}
853
854boost::future<bool>
855IListImpl::add(const serialization::pimpl::data& element)
856{
857 auto request = protocol::codec::list_add_encode(get_name(), element);
858 return invoke_and_get_future<bool>(request, partition_id_);
859}
860
861boost::future<bool>
862IListImpl::remove(const serialization::pimpl::data& element)
863{
864 auto request = protocol::codec::list_remove_encode(get_name(), element);
865 return invoke_and_get_future<bool>(request, partition_id_);
866}
867
868boost::future<bool>
869IListImpl::contains_all_data(
870 const std::vector<serialization::pimpl::data>& elements)
871{
872 auto request =
873 protocol::codec::list_containsall_encode(get_name(), elements);
874 return invoke_and_get_future<bool>(request, partition_id_);
875}
876
877boost::future<bool>
878IListImpl::add_all_data(const std::vector<serialization::pimpl::data>& elements)
879{
880 auto request = protocol::codec::list_addall_encode(get_name(), elements);
881 return invoke_and_get_future<bool>(request, partition_id_);
882}
883
884boost::future<bool>
885IListImpl::add_all_data(int index,
886 const std::vector<serialization::pimpl::data>& elements)
887{
888 auto request =
889 protocol::codec::list_addallwithindex_encode(get_name(), index, elements);
890 return invoke_and_get_future<bool>(request, partition_id_);
891}
892
893boost::future<bool>
894IListImpl::remove_all_data(
895 const std::vector<serialization::pimpl::data>& elements)
896{
897 auto request =
898 protocol::codec::list_compareandremoveall_encode(get_name(), elements);
899 return invoke_and_get_future<bool>(request, partition_id_);
900}
901
902boost::future<bool>
903IListImpl::retain_all_data(
904 const std::vector<serialization::pimpl::data>& elements)
905{
906 auto request =
907 protocol::codec::list_compareandretainall_encode(get_name(), elements);
908 return invoke_and_get_future<bool>(request, partition_id_);
909}
910
911boost::future<void>
912IListImpl::clear()
913{
914 auto request = protocol::codec::list_clear_encode(get_name());
915 return to_void_future(invoke_on_partition(request, partition_id_));
916}
917
918boost::future<boost::optional<serialization::pimpl::data>>
919IListImpl::get_data(int index)
920{
921 auto request = protocol::codec::list_get_encode(get_name(), index);
922 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
923 request, partition_id_);
924}
925
926boost::future<boost::optional<serialization::pimpl::data>>
927IListImpl::set_data(int index, const serialization::pimpl::data& element)
928{
929 auto request = protocol::codec::list_set_encode(get_name(), index, element);
930 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
931 request, partition_id_);
932}
933
934boost::future<void>
935IListImpl::add(int index, const serialization::pimpl::data& element)
936{
937 auto request =
938 protocol::codec::list_addwithindex_encode(get_name(), index, element);
939 return to_void_future(invoke_on_partition(request, partition_id_));
940}
941
942boost::future<boost::optional<serialization::pimpl::data>>
943IListImpl::remove_data(int index)
944{
945 auto request =
946 protocol::codec::list_removewithindex_encode(get_name(), index);
947 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
948 request, partition_id_);
949}
950
951boost::future<int>
952IListImpl::index_of(const serialization::pimpl::data& element)
953{
954 auto request = protocol::codec::list_indexof_encode(get_name(), element);
955 return invoke_and_get_future<int>(request, partition_id_);
956}
957
958boost::future<int>
959IListImpl::last_index_of(const serialization::pimpl::data& element)
960{
961 auto request =
962 protocol::codec::list_lastindexof_encode(get_name(), element);
963 return invoke_and_get_future<int>(request, partition_id_);
964}
965
966boost::future<std::vector<serialization::pimpl::data>>
967IListImpl::sub_list_data(int from_index, int to_index)
968{
969 auto request =
970 protocol::codec::list_sub_encode(get_name(), from_index, to_index);
971 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
972 request, partition_id_);
973}
974
975std::shared_ptr<spi::impl::ListenerMessageCodec>
976IListImpl::create_item_listener_codec(bool include_value)
977{
978 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
979 new ListListenerMessageCodec(get_name(), include_value));
980}
981
982IListImpl::ListListenerMessageCodec::ListListenerMessageCodec(
983 std::string name,
984 bool include_value)
985 : name_(std::move(name))
986 , include_value_(include_value)
987{}
988
989protocol::ClientMessage
990IListImpl::ListListenerMessageCodec::encode_add_request(bool local_only) const
991{
992 return protocol::codec::list_addlistener_encode(
993 name_, include_value_, local_only);
994}
995
996protocol::ClientMessage
997IListImpl::ListListenerMessageCodec::encode_remove_request(
998 boost::uuids::uuid real_registration_id) const
999{
1000 return protocol::codec::list_removelistener_encode(name_,
1001 real_registration_id);
1002}
1003
1004flake_id_generator_impl::Block::Block(IdBatch&& id_batch,
1005 std::chrono::milliseconds validity)
1006 : id_batch_(id_batch)
1007 , invalid_since_(std::chrono::steady_clock::now() + validity)
1008 , num_returned_(0)
1009{}
1010
1011int64_t
1012flake_id_generator_impl::Block::next()
1013{
1014 if (invalid_since_ <= std::chrono::steady_clock::now()) {
1015 return INT64_MIN;
1016 }
1017 int32_t index;
1018 do {
1019 index = num_returned_;
1020 if (index == id_batch_.get_batch_size()) {
1021 return INT64_MIN;
1022 }
1023 } while (!num_returned_.compare_exchange_strong(index, index + 1));
1024
1025 return id_batch_.get_base() + index * id_batch_.get_increment();
1026}
1027
1028flake_id_generator_impl::IdBatch::IdIterator
1029 flake_id_generator_impl::IdBatch::endOfBatch;
1030
1031int64_t
1032flake_id_generator_impl::IdBatch::get_base() const
1033{
1034 return base_;
1035}
1036
1037int64_t
1038flake_id_generator_impl::IdBatch::get_increment() const
1039{
1040 return increment_;
1041}
1042
1043int32_t
1044flake_id_generator_impl::IdBatch::get_batch_size() const
1045{
1046 return batch_size_;
1047}
1048
1049flake_id_generator_impl::IdBatch::IdBatch(int64_t base,
1050 int64_t increment,
1051 int32_t batch_size)
1052 : base_(base)
1053 , increment_(increment)
1054 , batch_size_(batch_size)
1055{}
1056
1057flake_id_generator_impl::IdBatch::IdIterator&
1058flake_id_generator_impl::IdBatch::end()
1059{
1060 return endOfBatch;
1061}
1062
1063flake_id_generator_impl::IdBatch::IdIterator
1064flake_id_generator_impl::IdBatch::iterator()
1065{
1066 return flake_id_generator_impl::IdBatch::IdIterator(
1067 base_, increment_, batch_size_);
1068}
1069
1070flake_id_generator_impl::IdBatch::IdIterator::IdIterator(
1071 int64_t base2,
1072 const int64_t increment,
1073 int32_t remaining)
1074 : base2_(base2)
1075 , increment_(increment)
1076 , remaining_(remaining)
1077{}
1078
1079bool
1080flake_id_generator_impl::IdBatch::IdIterator::operator==(
1081 const flake_id_generator_impl::IdBatch::IdIterator& rhs) const
1082{
1083 return base2_ == rhs.base2_ && increment_ == rhs.increment_ &&
1084 remaining_ == rhs.remaining_;
1085}
1086
1087bool
1088flake_id_generator_impl::IdBatch::IdIterator::operator!=(
1089 const flake_id_generator_impl::IdBatch::IdIterator& rhs) const
1090{
1091 return !(rhs == *this);
1092}
1093
1094flake_id_generator_impl::IdBatch::IdIterator::IdIterator()
1095 : base2_(-1)
1096 , increment_(-1)
1097 , remaining_(-1)
1098{}
1099
1100flake_id_generator_impl::IdBatch::IdIterator&
1101flake_id_generator_impl::IdBatch::IdIterator::operator++()
1102{
1103 if (remaining_ == 0) {
1104 return flake_id_generator_impl::IdBatch::end();
1105 }
1106
1107 --remaining_;
1108
1109 base2_ += increment_;
1110
1111 return *this;
1112}
1113
1114flake_id_generator_impl::flake_id_generator_impl(
1115 const std::string& service_name,
1116 const std::string& object_name,
1117 spi::ClientContext* context)
1118 : ProxyImpl(service_name, object_name, context)
1119 , block_(nullptr)
1120{
1121 auto config =
1122 context->get_client_config().find_flake_id_generator_config(object_name);
1123 batch_size_ = config->get_prefetch_count();
1124 validity_ = config->get_prefetch_validity_duration();
1125}
1126
1127int64_t
1128flake_id_generator_impl::new_id_internal()
1129{
1130 auto b = block_.load();
1131 if (b) {
1132 int64_t res = b->next();
1133 if (res != INT64_MIN) {
1134 return res;
1135 }
1136 }
1137
1138 throw std::overflow_error("");
1139}
1140
1141boost::future<int64_t>
1142flake_id_generator_impl::new_id()
1143{
1144 try {
1145 return boost::make_ready_future(new_id_internal());
1146 } catch (std::overflow_error&) {
1147 return new_id_batch(batch_size_)
1148 .then(boost::launch::sync,
1149 [=](boost::future<flake_id_generator_impl::IdBatch> f) {
1150 auto newBlock =
1151 boost::make_shared<Block>(f.get(), validity_);
1152 auto value = newBlock->next();
1153 auto b = block_.load();
1154 block_.compare_exchange_strong(b, newBlock);
1155 return value;
1156 });
1157 }
1158}
1159
1160boost::future<flake_id_generator_impl::IdBatch>
1161flake_id_generator_impl::new_id_batch(int32_t size)
1162{
1163 auto request =
1164 protocol::codec::flakeidgenerator_newidbatch_encode(get_name(), size);
1165 return invoke(request).then(
1166 boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
1167 auto msg = f.get();
1168 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN);
1169
1170 auto base = msg.get<int64_t>();
1171 auto increment = msg.get<int64_t>();
1172 auto batch_size = msg.get<int32_t>();
1173 return flake_id_generator_impl::IdBatch(base, increment, batch_size);
1174 });
1175}
1176
1177IQueueImpl::IQueueImpl(const std::string& instance_name,
1178 spi::ClientContext* context)
1179 : ProxyImpl("hz:impl:queueService", instance_name, context)
1180{
1181 serialization::pimpl::data data =
1182 get_context().get_serialization_service().to_data<std::string>(
1183 &instance_name);
1184 partition_id_ = get_partition_id(data);
1185}
1186
1187boost::future<bool>
1188IQueueImpl::remove_item_listener(boost::uuids::uuid registration_id)
1189{
1190 return get_context().get_client_listener_service().deregister_listener(
1191 registration_id);
1192}
1193
1194boost::future<bool>
1195IQueueImpl::offer(const serialization::pimpl::data& element,
1196 std::chrono::milliseconds timeout)
1197{
1198 auto request = protocol::codec::queue_offer_encode(
1199 get_name(),
1200 element,
1201 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1202 return invoke_and_get_future<bool>(request, partition_id_);
1203}
1204
1205boost::future<void>
1206IQueueImpl::put(const serialization::pimpl::data& element)
1207{
1208 auto request = protocol::codec::queue_put_encode(get_name(), element);
1209 return to_void_future(invoke_on_partition(request, partition_id_));
1210}
1211
1212boost::future<boost::optional<serialization::pimpl::data>>
1213IQueueImpl::poll_data(std::chrono::milliseconds timeout)
1214{
1215 auto request = protocol::codec::queue_poll_encode(
1216 get_name(),
1217 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1218 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1219 request, partition_id_);
1220}
1221
1222boost::future<int>
1223IQueueImpl::remaining_capacity()
1224{
1225 auto request = protocol::codec::queue_remainingcapacity_encode(get_name());
1226 return invoke_and_get_future<int>(request, partition_id_);
1227}
1228
1229boost::future<bool>
1230IQueueImpl::remove(const serialization::pimpl::data& element)
1231{
1232 auto request = protocol::codec::queue_remove_encode(get_name(), element);
1233 return invoke_and_get_future<bool>(request, partition_id_);
1234}
1235
1236boost::future<bool>
1237IQueueImpl::contains(const serialization::pimpl::data& element)
1238{
1239 auto request = protocol::codec::queue_contains_encode(get_name(), element);
1240 return invoke_and_get_future<bool>(request, partition_id_);
1241}
1242
1243boost::future<std::vector<serialization::pimpl::data>>
1244IQueueImpl::drain_to_data(size_t max_elements)
1245{
1246 auto request = protocol::codec::queue_draintomaxsize_encode(
1247 get_name(), (int32_t)max_elements);
1248
1249 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1250 request, partition_id_);
1251}
1252
1253boost::future<std::vector<serialization::pimpl::data>>
1254IQueueImpl::drain_to_data()
1255{
1256 auto request = protocol::codec::queue_drainto_encode(get_name());
1257 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1258 request, partition_id_);
1259}
1260
1261boost::future<boost::optional<serialization::pimpl::data>>
1262IQueueImpl::take_data()
1263{
1264 auto request = protocol::codec::queue_take_encode(get_name());
1265 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1266 request, partition_id_);
1267}
1268
1269boost::future<boost::optional<serialization::pimpl::data>>
1270IQueueImpl::peek_data()
1271{
1272 auto request = protocol::codec::queue_peek_encode(get_name());
1273 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1274 request, partition_id_);
1275}
1276
1277boost::future<int>
1278IQueueImpl::size()
1279{
1280 auto request = protocol::codec::queue_size_encode(get_name());
1281 return invoke_and_get_future<int>(request, partition_id_);
1282}
1283
1284boost::future<bool>
1285IQueueImpl::is_empty()
1286{
1287 auto request = protocol::codec::queue_isempty_encode(get_name());
1288 return invoke_and_get_future<bool>(request, partition_id_);
1289}
1290
1291boost::future<std::vector<serialization::pimpl::data>>
1292IQueueImpl::to_array_data()
1293{
1294 auto request = protocol::codec::queue_iterator_encode(get_name());
1295 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1296 request, partition_id_);
1297}
1298
1299boost::future<bool>
1300IQueueImpl::contains_all_data(
1301 const std::vector<serialization::pimpl::data>& elements)
1302{
1303 auto request =
1304 protocol::codec::queue_containsall_encode(get_name(), elements);
1305 return invoke_and_get_future<bool>(request, partition_id_);
1306}
1307
1308boost::future<bool>
1309IQueueImpl::add_all_data(
1310 const std::vector<serialization::pimpl::data>& elements)
1311{
1312 auto request = protocol::codec::queue_addall_encode(get_name(), elements);
1313 return invoke_and_get_future<bool>(request, partition_id_);
1314}
1315
1316boost::future<bool>
1317IQueueImpl::remove_all_data(
1318 const std::vector<serialization::pimpl::data>& elements)
1319{
1320 auto request =
1321 protocol::codec::queue_compareandremoveall_encode(get_name(), elements);
1322 return invoke_and_get_future<bool>(request, partition_id_);
1323}
1324
1325boost::future<bool>
1326IQueueImpl::retain_all_data(
1327 const std::vector<serialization::pimpl::data>& elements)
1328{
1329 auto request =
1330 protocol::codec::queue_compareandretainall_encode(get_name(), elements);
1331 return invoke_and_get_future<bool>(request, partition_id_);
1332}
1333
1334boost::future<void>
1335IQueueImpl::clear()
1336{
1337 auto request = protocol::codec::queue_clear_encode(get_name());
1338 return to_void_future(invoke_on_partition(request, partition_id_));
1339}
1340
1341std::shared_ptr<spi::impl::ListenerMessageCodec>
1342IQueueImpl::create_item_listener_codec(bool include_value)
1343{
1344 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
1345 new QueueListenerMessageCodec(get_name(), include_value));
1346}
1347
1348IQueueImpl::QueueListenerMessageCodec::QueueListenerMessageCodec(
1349 std::string name,
1350 bool include_value)
1351 : name_(std::move(name))
1352 , include_value_(include_value)
1353{}
1354
1355protocol::ClientMessage
1356IQueueImpl::QueueListenerMessageCodec::encode_add_request(bool local_only) const
1357{
1358 return protocol::codec::queue_addlistener_encode(
1359 name_, include_value_, local_only);
1360}
1361
1362protocol::ClientMessage
1363IQueueImpl::QueueListenerMessageCodec::encode_remove_request(
1364 boost::uuids::uuid real_registration_id) const
1365{
1366 return protocol::codec::queue_removelistener_encode(name_,
1367 real_registration_id);
1368}
1369
1370ProxyImpl::ProxyImpl(const std::string& service_name,
1371 const std::string& object_name,
1372 spi::ClientContext* context)
1373 : ClientProxy(object_name, service_name, *context)
1374 , SerializingProxy(*context, object_name)
1375{}
1376
1377ProxyImpl::~ProxyImpl() = default;
1378
1379SerializingProxy::SerializingProxy(spi::ClientContext& context,
1380 const std::string& object_name)
1381 : serialization_service_(context.get_serialization_service())
1382 , partition_service_(context.get_partition_service())
1383 , object_name_(object_name)
1384 , client_context_(context)
1385{}
1386
1387int
1388SerializingProxy::get_partition_id(const serialization::pimpl::data& key)
1389{
1390 return partition_service_.get_partition_id(key);
1391}
1392
1393boost::future<protocol::ClientMessage>
1394SerializingProxy::invoke_on_partition(protocol::ClientMessage& request,
1395 int partition_id)
1396{
1397 try {
1398 return spi::impl::ClientInvocation::create(
1399 client_context_,
1400 std::make_shared<protocol::ClientMessage>(std::move(request)),
1401 object_name_,
1402 partition_id)
1403 ->invoke();
1404 } catch (exception::iexception&) {
1405 util::exception_util::rethrow(std::current_exception());
1406 return boost::make_ready_future(protocol::ClientMessage(0));
1407 }
1408}
1409
1410boost::future<protocol::ClientMessage>
1411SerializingProxy::invoke(protocol::ClientMessage& request)
1412{
1413 try {
1414 return spi::impl::ClientInvocation::create(
1415 client_context_,
1416 std::make_shared<protocol::ClientMessage>(std::move(request)),
1417 object_name_)
1418 ->invoke();
1419 } catch (exception::iexception&) {
1420 util::exception_util::rethrow(std::current_exception());
1421 return boost::make_ready_future(protocol::ClientMessage(0));
1422 }
1423}
1424
1425boost::future<protocol::ClientMessage>
1426SerializingProxy::invoke_on_connection(
1427 protocol::ClientMessage& request,
1428 std::shared_ptr<connection::Connection> connection)
1429{
1430 try {
1431 return spi::impl::ClientInvocation::create(
1432 client_context_,
1433 std::make_shared<protocol::ClientMessage>(std::move(request)),
1434 object_name_,
1435 connection)
1436 ->invoke();
1437 } catch (exception::iexception&) {
1438 util::exception_util::rethrow(std::current_exception());
1439 return boost::make_ready_future(protocol::ClientMessage(0));
1440 }
1441}
1442
1443boost::future<protocol::ClientMessage>
1444SerializingProxy::invoke_on_key_owner(
1445 protocol::ClientMessage& request,
1446 const serialization::pimpl::data& key_data)
1447{
1448 try {
1449 return invoke_on_partition(request, get_partition_id(key_data));
1450 } catch (exception::iexception&) {
1451 util::exception_util::rethrow(std::current_exception());
1452 return boost::make_ready_future(protocol::ClientMessage(0));
1453 }
1454}
1455
1456boost::future<protocol::ClientMessage>
1457SerializingProxy::invoke_on_member(protocol::ClientMessage& request,
1458 boost::uuids::uuid uuid)
1459{
1460 try {
1461 auto invocation = spi::impl::ClientInvocation::create(
1462 client_context_,
1463 std::make_shared<protocol::ClientMessage>(std::move(request)),
1464 object_name_,
1465 uuid);
1466 return invocation->invoke();
1467 } catch (exception::iexception&) {
1468 util::exception_util::rethrow(std::current_exception());
1469 return boost::make_ready_future(protocol::ClientMessage(0));
1470 }
1471}
1472
1473template<>
1474boost::future<boost::optional<serialization::pimpl::data>>
1475SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request)
1476{
1477 return decode_optional_var_sized<serialization::pimpl::data>(
1478 invoke(request));
1479}
1480
1481template<>
1482boost::future<boost::optional<map::data_entry_view>>
1483SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1484 const serialization::pimpl::data& key)
1485{
1486 return decode_optional_var_sized<map::data_entry_view>(
1487 invoke_on_key_owner(request, key));
1488}
1489
1490template<>
1491boost::future<boost::optional<serialization::pimpl::data>>
1492SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1493 int partition_id)
1494{
1495 return decode_optional_var_sized<serialization::pimpl::data>(
1496 invoke_on_partition(request, partition_id));
1497}
1498
1499template<>
1500boost::future<boost::optional<serialization::pimpl::data>>
1501SerializingProxy::invoke_and_get_future(protocol::ClientMessage& request,
1502 const serialization::pimpl::data& key)
1503{
1504 return decode_optional_var_sized<serialization::pimpl::data>(
1505 invoke_on_key_owner(request, key));
1506}
1507
1508PartitionSpecificClientProxy::PartitionSpecificClientProxy(
1509 const std::string& service_name,
1510 const std::string& object_name,
1511 spi::ClientContext* context)
1512 : ProxyImpl(service_name, object_name, context)
1513 , partition_id_(-1)
1514{}
1515
1516void
1517PartitionSpecificClientProxy::on_initialize()
1518{
1519 std::string partitionKey = internal::partition::strategy::
1520 StringPartitioningStrategy::get_partition_key(name_);
1521 partition_id_ = get_context().get_partition_service().get_partition_id(
1522 to_data<std::string>(partitionKey));
1523}
1524
1525IMapImpl::IMapImpl(const std::string& instance_name,
1526 spi::ClientContext* context)
1527 : ProxyImpl("hz:impl:mapService", instance_name, context)
1528{}
1529
1530boost::future<bool>
1531IMapImpl::contains_key(const serialization::pimpl::data& key)
1532{
1533 auto request = protocol::codec::map_containskey_encode(
1534 get_name(), key, util::get_current_thread_id());
1535 return invoke_and_get_future<bool>(request, key);
1536}
1537
1538boost::future<bool>
1539IMapImpl::contains_value(const serialization::pimpl::data& value)
1540{
1541 auto request = protocol::codec::map_containsvalue_encode(get_name(), value);
1542 return invoke_and_get_future<bool>(request);
1543}
1544
1545boost::future<boost::optional<serialization::pimpl::data>>
1546IMapImpl::get_data(const serialization::pimpl::data& key)
1547{
1548 auto request = protocol::codec::map_get_encode(
1549 get_name(), key, util::get_current_thread_id());
1550 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1551 request, key);
1552}
1553
1554boost::future<boost::optional<serialization::pimpl::data>>
1555IMapImpl::remove_data(const serialization::pimpl::data& key)
1556{
1557 auto request = protocol::codec::map_remove_encode(
1558 get_name(), key, util::get_current_thread_id());
1559 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1560 request, key);
1561}
1562
1563boost::future<bool>
1564IMapImpl::remove(const serialization::pimpl::data& key,
1565 const serialization::pimpl::data& value)
1566{
1567 auto request = protocol::codec::map_removeifsame_encode(
1568 get_name(), key, value, util::get_current_thread_id());
1569 return invoke_and_get_future<bool>(request, key);
1570}
1571
1572boost::future<protocol::ClientMessage>
1573IMapImpl::remove_all(const serialization::pimpl::data& predicate_data)
1574{
1575 auto request =
1576 protocol::codec::map_removeall_encode(get_name(), predicate_data);
1577 return invoke(request);
1578}
1579
1580boost::future<protocol::ClientMessage>
1581IMapImpl::delete_entry(const serialization::pimpl::data& key)
1582{
1583 auto request = protocol::codec::map_delete_encode(
1584 get_name(), key, util::get_current_thread_id());
1585 return invoke_on_partition(request, get_partition_id(key));
1586}
1587
1588boost::future<protocol::ClientMessage>
1589IMapImpl::flush()
1590{
1591 auto request = protocol::codec::map_flush_encode(get_name());
1592 return invoke(request);
1593}
1594
1595boost::future<bool>
1596IMapImpl::try_remove(const serialization::pimpl::data& key,
1597 std::chrono::milliseconds timeout)
1598{
1599 auto request = protocol::codec::map_tryremove_encode(
1600 get_name(),
1601 key,
1602 util::get_current_thread_id(),
1603 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1604
1605 return invoke_and_get_future<bool>(request, key);
1606}
1607
1608boost::future<bool>
1609IMapImpl::try_put(const serialization::pimpl::data& key,
1610 const serialization::pimpl::data& value,
1611 std::chrono::milliseconds timeout)
1612{
1613 auto request = protocol::codec::map_tryput_encode(
1614 get_name(),
1615 key,
1616 value,
1617 util::get_current_thread_id(),
1618 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
1619
1620 return invoke_and_get_future<bool>(request, key);
1621}
1622
1623boost::future<boost::optional<serialization::pimpl::data>>
1624IMapImpl::put_data(const serialization::pimpl::data& key,
1625 const serialization::pimpl::data& value,
1626 std::chrono::milliseconds ttl)
1627{
1628 auto request = protocol::codec::map_put_encode(
1629 get_name(),
1630 key,
1631 value,
1632 util::get_current_thread_id(),
1633 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1634 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1635 request, key);
1636}
1637
1638boost::future<protocol::ClientMessage>
1639IMapImpl::put_transient(const serialization::pimpl::data& key,
1640 const serialization::pimpl::data& value,
1641 std::chrono::milliseconds ttl)
1642{
1643 auto request = protocol::codec::map_puttransient_encode(
1644 get_name(),
1645 key,
1646 value,
1647 util::get_current_thread_id(),
1648 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1649 return invoke_on_partition(request, get_partition_id(key));
1650}
1651
1652boost::future<boost::optional<serialization::pimpl::data>>
1653IMapImpl::put_if_absent_data(const serialization::pimpl::data& key,
1654 const serialization::pimpl::data& value,
1655 std::chrono::milliseconds ttl)
1656{
1657 auto request = protocol::codec::map_putifabsent_encode(
1658 get_name(),
1659 key,
1660 value,
1661 util::get_current_thread_id(),
1662 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1663 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1664 request, key);
1665}
1666
1667boost::future<bool>
1668IMapImpl::replace(const serialization::pimpl::data& key,
1669 const serialization::pimpl::data& old_value,
1670 const serialization::pimpl::data& new_value)
1671{
1672 auto request = protocol::codec::map_replaceifsame_encode(
1673 get_name(), key, old_value, new_value, util::get_current_thread_id());
1674
1675 return invoke_and_get_future<bool>(request, key);
1676}
1677
1678boost::future<boost::optional<serialization::pimpl::data>>
1679IMapImpl::replace_data(const serialization::pimpl::data& key,
1680 const serialization::pimpl::data& value)
1681{
1682 auto request = protocol::codec::map_replace_encode(
1683 get_name(), key, value, util::get_current_thread_id());
1684
1685 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1686 request, key);
1687}
1688
1689boost::future<protocol::ClientMessage>
1690IMapImpl::set(const serialization::pimpl::data& key,
1691 const serialization::pimpl::data& value,
1692 std::chrono::milliseconds ttl)
1693{
1694 auto request = protocol::codec::map_set_encode(
1695 get_name(),
1696 key,
1697 value,
1698 util::get_current_thread_id(),
1699 std::chrono::duration_cast<std::chrono::milliseconds>(ttl).count());
1700 return invoke_on_partition(request, get_partition_id(key));
1701}
1702
1703boost::future<protocol::ClientMessage>
1704IMapImpl::lock(const serialization::pimpl::data& key)
1705{
1706 return lock(key, std::chrono::milliseconds(-1));
1707}
1708
1709boost::future<protocol::ClientMessage>
1710IMapImpl::lock(const serialization::pimpl::data& key,
1711 std::chrono::milliseconds lease_time)
1712{
1713 auto request = protocol::codec::map_lock_encode(
1714 get_name(),
1715 key,
1716 util::get_current_thread_id(),
1717 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1718 lock_reference_id_generator_->get_next_reference_id());
1719 return invoke_on_partition(request, get_partition_id(key));
1720}
1721
1722boost::future<bool>
1723IMapImpl::is_locked(const serialization::pimpl::data& key)
1724{
1725 auto request = protocol::codec::map_islocked_encode(get_name(), key);
1726
1727 return invoke_and_get_future<bool>(request, key);
1728}
1729
1730boost::future<bool>
1731IMapImpl::try_lock(const serialization::pimpl::data& key,
1732 std::chrono::milliseconds timeout)
1733{
1734 return try_lock(key, timeout, std::chrono::milliseconds(-1));
1735}
1736
1737boost::future<bool>
1738IMapImpl::try_lock(const serialization::pimpl::data& key,
1739 std::chrono::milliseconds timeout,
1740 std::chrono::milliseconds lease_time)
1741{
1742 auto request = protocol::codec::map_trylock_encode(
1743 get_name(),
1744 key,
1745 util::get_current_thread_id(),
1746 std::chrono::duration_cast<std::chrono::milliseconds>(lease_time).count(),
1747 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(),
1748 lock_reference_id_generator_->get_next_reference_id());
1749 return invoke_and_get_future<bool>(request, key);
1750}
1751
1752boost::future<protocol::ClientMessage>
1753IMapImpl::unlock(const serialization::pimpl::data& key)
1754{
1755 auto request = protocol::codec::map_unlock_encode(
1756 get_name(),
1757 key,
1758 util::get_current_thread_id(),
1759 lock_reference_id_generator_->get_next_reference_id());
1760 return invoke_on_partition(request, get_partition_id(key));
1761}
1762
1763boost::future<protocol::ClientMessage>
1764IMapImpl::force_unlock(const serialization::pimpl::data& key)
1765{
1766 auto request = protocol::codec::map_forceunlock_encode(
1767 get_name(), key, lock_reference_id_generator_->get_next_reference_id());
1768 return invoke_on_partition(request, get_partition_id(key));
1769}
1770
1771boost::future<std::string>
1772IMapImpl::add_interceptor(const serialization::pimpl::data& interceptor)
1773{
1774 auto request =
1775 protocol::codec::map_addinterceptor_encode(get_name(), interceptor);
1776 return invoke_and_get_future<std::string>(request);
1777}
1778
1779// TODO: We can use generic template Listener instead of impl::BaseEventHandler
1780// to prevent the virtual function calls
1781boost::future<boost::uuids::uuid>
1782IMapImpl::add_entry_listener(
1783 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1784 bool include_value,
1785 int32_t listener_flags)
1786{
1787 return register_listener(
1788 create_map_entry_listener_codec(include_value, listener_flags),
1789 std::move(entry_event_handler));
1790}
1791
1792boost::future<boost::uuids::uuid>
1793IMapImpl::add_entry_listener(
1794 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1795 serialization::pimpl::data&& predicate,
1796 bool include_value,
1797 int32_t listener_flags)
1798{
1799 return register_listener(
1800 create_map_entry_listener_codec(
1801 include_value, std::move(predicate), listener_flags),
1802 std::move(entry_event_handler));
1803}
1804
1805boost::future<bool>
1806IMapImpl::remove_entry_listener(boost::uuids::uuid registration_id)
1807{
1808 return get_context().get_client_listener_service().deregister_listener(
1809 registration_id);
1810}
1811
1812boost::future<boost::uuids::uuid>
1813IMapImpl::add_entry_listener(
1814 std::shared_ptr<impl::BaseEventHandler> entry_event_handler,
1815 bool include_value,
1816 serialization::pimpl::data&& key,
1817 int32_t listener_flags)
1818{
1819 return register_listener(create_map_entry_listener_codec(
1820 include_value, listener_flags, std::move(key)),
1821 std::move(entry_event_handler));
1822}
1823
1824boost::future<boost::optional<map::data_entry_view>>
1825IMapImpl::get_entry_view_data(const serialization::pimpl::data& key)
1826{
1827 auto request = protocol::codec::map_getentryview_encode(
1828 get_name(), key, util::get_current_thread_id());
1829 return invoke_and_get_future<boost::optional<map::data_entry_view>>(request,
1830 key);
1831}
1832
1833boost::future<bool>
1834IMapImpl::evict(const serialization::pimpl::data& key)
1835{
1836 auto request = protocol::codec::map_evict_encode(
1837 get_name(), key, util::get_current_thread_id());
1838 return invoke_and_get_future<bool>(request, key);
1839}
1840
1841boost::future<protocol::ClientMessage>
1842IMapImpl::evict_all()
1843{
1844 auto request = protocol::codec::map_evictall_encode(get_name());
1845 return invoke(request);
1846}
1847
1848boost::future<EntryVector>
1849IMapImpl::get_all_data(int partition_id,
1850 const std::vector<serialization::pimpl::data>& keys)
1851{
1852 auto request = protocol::codec::map_getall_encode(get_name(), keys);
1853 return invoke_and_get_future<EntryVector>(request, partition_id);
1854}
1855
1856boost::future<std::vector<serialization::pimpl::data>>
1857IMapImpl::key_set_data()
1858{
1859 auto request = protocol::codec::map_keyset_encode(get_name());
1860 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1861 request);
1862}
1863
1864boost::future<std::vector<serialization::pimpl::data>>
1865IMapImpl::key_set_data(const serialization::pimpl::data& predicate)
1866{
1867 auto request =
1868 protocol::codec::map_keysetwithpredicate_encode(get_name(), predicate);
1869 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1870 request);
1871}
1872
1873boost::future<
1874 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1875IMapImpl::key_set_for_paging_predicate_data(
1876 protocol::codec::holder::paging_predicate_holder const& predicate)
1877{
1878 auto request = protocol::codec::map_keysetwithpagingpredicate_encode(
1879 get_name(), predicate);
1880 return invoke(request).then(
1881 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1882 return get_paging_predicate_response<
1883 std::vector<serialization::pimpl::data>>(std::move(f));
1884 });
1885}
1886
1887boost::future<EntryVector>
1888IMapImpl::entry_set_data()
1889{
1890 auto request = protocol::codec::map_entryset_encode(get_name());
1891 return invoke_and_get_future<EntryVector>(request);
1892}
1893
1894boost::future<EntryVector>
1895IMapImpl::entry_set_data(const serialization::pimpl::data& predicate)
1896{
1897 auto request =
1898 protocol::codec::map_entrieswithpredicate_encode(get_name(), predicate);
1899 return invoke_and_get_future<EntryVector>(request);
1900}
1901
1902boost::future<std::pair<EntryVector, query::anchor_data_list>>
1903IMapImpl::entry_set_for_paging_predicate_data(
1904 protocol::codec::holder::paging_predicate_holder const& predicate)
1905{
1906 auto request = protocol::codec::map_entrieswithpagingpredicate_encode(
1907 get_name(), predicate);
1908 return invoke(request).then(
1909 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1910 return get_paging_predicate_response<EntryVector>(std::move(f));
1911 });
1912}
1913
1914boost::future<std::vector<serialization::pimpl::data>>
1915IMapImpl::values_data()
1916{
1917 auto request = protocol::codec::map_values_encode(get_name());
1918 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1919 request);
1920}
1921
1922boost::future<std::vector<serialization::pimpl::data>>
1923IMapImpl::values_data(const serialization::pimpl::data& predicate)
1924{
1925 auto request =
1926 protocol::codec::map_valueswithpredicate_encode(get_name(), predicate);
1927 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
1928 request);
1929}
1930
1931boost::future<
1932 std::pair<std::vector<serialization::pimpl::data>, query::anchor_data_list>>
1933IMapImpl::values_for_paging_predicate_data(
1934 protocol::codec::holder::paging_predicate_holder const& predicate)
1935{
1936 auto request = protocol::codec::map_valueswithpagingpredicate_encode(
1937 get_name(), predicate);
1938 return invoke(request).then(
1939 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
1940 return get_paging_predicate_response<
1941 std::vector<serialization::pimpl::data>>(std::move(f));
1942 });
1943}
1944
1945boost::future<protocol::ClientMessage>
1946IMapImpl::add_index_data(const config::index_config& config)
1947{
1948 auto request = protocol::codec::map_addindex_encode(get_name(), config);
1949 return invoke(request);
1950}
1951
1952boost::future<int>
1953IMapImpl::size()
1954{
1955 auto request = protocol::codec::map_size_encode(get_name());
1956 return invoke_and_get_future<int>(request);
1957}
1958
1959boost::future<bool>
1960IMapImpl::is_empty()
1961{
1962 auto request = protocol::codec::map_isempty_encode(get_name());
1963 return invoke_and_get_future<bool>(request);
1964}
1965
1966boost::future<protocol::ClientMessage>
1967IMapImpl::put_all_data(int partition_id, const EntryVector& entries)
1968{
1969 auto request =
1970 protocol::codec::map_putall_encode(get_name(), entries, true);
1971 return invoke_on_partition(request, partition_id);
1972}
1973
1974boost::future<protocol::ClientMessage>
1975IMapImpl::clear_data()
1976{
1977 auto request = protocol::codec::map_clear_encode(get_name());
1978 return invoke(request);
1979}
1980
1981boost::future<boost::optional<serialization::pimpl::data>>
1982IMapImpl::execute_on_key_data(const serialization::pimpl::data& key,
1983 const serialization::pimpl::data& processor)
1984{
1985 auto request = protocol::codec::map_executeonkey_encode(
1986 get_name(), processor, key, util::get_current_thread_id());
1987 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
1988 request, get_partition_id(key));
1989}
1990
1991boost::future<boost::optional<serialization::pimpl::data>>
1992IMapImpl::submit_to_key_data(const serialization::pimpl::data& key,
1993 const serialization::pimpl::data& processor)
1994{
1995 auto request = protocol::codec::map_submittokey_encode(
1996 get_name(), processor, key, util::get_current_thread_id());
1997 return invoke_on_partition(request, get_partition_id(key))
1998 .then(boost::launch::sync, [](boost::future<protocol::ClientMessage> f) {
1999 auto msg = f.get();
2000 msg.skip_frame();
2001 return msg.get_nullable<serialization::pimpl::data>();
2002 });
2003}
2004
2005boost::future<EntryVector>
2006IMapImpl::execute_on_keys_data(
2007 const std::vector<serialization::pimpl::data>& keys,
2008 const serialization::pimpl::data& processor)
2009{
2010 auto request =
2011 protocol::codec::map_executeonkeys_encode(get_name(), processor, keys);
2012 return invoke_and_get_future<EntryVector>(request);
2013}
2014
2015boost::future<protocol::ClientMessage>
2016IMapImpl::remove_interceptor(const std::string& id)
2017{
2018 auto request =
2019 protocol::codec::map_removeinterceptor_encode(get_name(), id);
2020 return invoke(request);
2021}
2022
2023boost::future<EntryVector>
2024IMapImpl::execute_on_entries_data(
2025 const serialization::pimpl::data& entry_processor)
2026{
2027 auto request =
2028 protocol::codec::map_executeonallkeys_encode(get_name(), entry_processor);
2029 return invoke_and_get_future<EntryVector>(request);
2030}
2031
2032boost::future<EntryVector>
2033IMapImpl::execute_on_entries_data(
2034 const serialization::pimpl::data& entry_processor,
2035 const serialization::pimpl::data& predicate)
2036{
2037 auto request = protocol::codec::map_executewithpredicate_encode(
2038 get_name(), entry_processor, predicate);
2039 return invoke_and_get_future<EntryVector>(request);
2040}
2041
2042std::shared_ptr<spi::impl::ListenerMessageCodec>
2043IMapImpl::create_map_entry_listener_codec(
2044 bool include_value,
2045 serialization::pimpl::data&& predicate,
2046 int32_t listener_flags)
2047{
2048 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2049 new MapEntryListenerWithPredicateMessageCodec(
2050 get_name(), include_value, listener_flags, std::move(predicate)));
2051}
2052
2053std::shared_ptr<spi::impl::ListenerMessageCodec>
2054IMapImpl::create_map_entry_listener_codec(bool include_value,
2055 int32_t listener_flags)
2056{
2057 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2058 new MapEntryListenerMessageCodec(
2059 get_name(), include_value, listener_flags));
2060}
2061
2062std::shared_ptr<spi::impl::ListenerMessageCodec>
2063IMapImpl::create_map_entry_listener_codec(bool include_value,
2064 int32_t listener_flags,
2065 serialization::pimpl::data&& key)
2066{
2067 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2068 new MapEntryListenerToKeyCodec(
2069 get_name(), include_value, listener_flags, std::move(key)));
2070}
2071
2072void
2073IMapImpl::on_initialize()
2074{
2075 ProxyImpl::on_initialize();
2076 lock_reference_id_generator_ =
2077 get_context().get_lock_reference_id_generator();
2078}
2079
2080IMapImpl::MapEntryListenerMessageCodec::MapEntryListenerMessageCodec(
2081 std::string name,
2082 bool include_value,
2083 int32_t listener_flags)
2084 : name_(std::move(name))
2085 , include_value_(include_value)
2086 , listener_flags_(listener_flags)
2087{}
2088
2089protocol::ClientMessage
2090IMapImpl::MapEntryListenerMessageCodec::encode_add_request(
2091 bool local_only) const
2092{
2093 return protocol::codec::map_addentrylistener_encode(
2094 name_, include_value_, static_cast<int32_t>(listener_flags_), local_only);
2095}
2096
2097protocol::ClientMessage
2098IMapImpl::MapEntryListenerMessageCodec::encode_remove_request(
2099 boost::uuids::uuid real_registration_id) const
2100{
2101 return protocol::codec::map_removeentrylistener_encode(
2102 name_, real_registration_id);
2103}
2104
2105protocol::ClientMessage
2106IMapImpl::MapEntryListenerToKeyCodec::encode_add_request(bool local_only) const
2107{
2108 return protocol::codec::map_addentrylistenertokey_encode(
2109 name_,
2110 key_,
2111 include_value_,
2112 static_cast<int32_t>(listener_flags_),
2113 local_only);
2114}
2115
2116protocol::ClientMessage
2117IMapImpl::MapEntryListenerToKeyCodec::encode_remove_request(
2118 boost::uuids::uuid real_registration_id) const
2119{
2120 return protocol::codec::map_removeentrylistener_encode(
2121 name_, real_registration_id);
2122}
2123
2124IMapImpl::MapEntryListenerToKeyCodec::MapEntryListenerToKeyCodec(
2125 std::string name,
2126 bool include_value,
2127 int32_t listener_flags,
2128 serialization::pimpl::data key)
2129 : name_(std::move(name))
2130 , include_value_(include_value)
2131 , listener_flags_(listener_flags)
2132 , key_(std::move(key))
2133{}
2134
2135IMapImpl::MapEntryListenerWithPredicateMessageCodec::
2136 MapEntryListenerWithPredicateMessageCodec(
2137 std::string name,
2138 bool include_value,
2139 int32_t listener_flags,
2140 serialization::pimpl::data&& predicate)
2141 : name_(std::move(name))
2142 , include_value_(include_value)
2143 , listener_flags_(listener_flags)
2144 , predicate_(std::move(predicate))
2145{}
2146
2147protocol::ClientMessage
2148IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_add_request(
2149 bool local_only) const
2150{
2151 return protocol::codec::map_addentrylistenerwithpredicate_encode(
2152 name_,
2153 predicate_,
2154 include_value_,
2155 static_cast<int32_t>(listener_flags_),
2156 local_only);
2157}
2158
2159protocol::ClientMessage
2160IMapImpl::MapEntryListenerWithPredicateMessageCodec::encode_remove_request(
2161 boost::uuids::uuid real_registration_id) const
2162{
2163 return protocol::codec::map_removeentrylistener_encode(
2164 name_, real_registration_id);
2165}
2166
2167TransactionalQueueImpl::TransactionalQueueImpl(
2168 const std::string& name,
2169 txn::TransactionProxy& transaction_proxy)
2170 : TransactionalObject(iqueue::SERVICE_NAME, name, transaction_proxy)
2171{}
2172
2173boost::future<bool>
2174TransactionalQueueImpl::offer(const serialization::pimpl::data& e,
2175 std::chrono::milliseconds timeout)
2176{
2177 auto request = protocol::codec::transactionalqueue_offer_encode(
2178 get_name(),
2179 get_transaction_id(),
2180 util::get_current_thread_id(),
2181 e,
2182 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2183
2184 return invoke_and_get_future<bool>(request);
2185}
2186
2187boost::future<boost::optional<serialization::pimpl::data>>
2188TransactionalQueueImpl::poll_data(std::chrono::milliseconds timeout)
2189{
2190 auto request = protocol::codec::transactionalqueue_poll_encode(
2191 get_name(),
2192 get_transaction_id(),
2193 util::get_current_thread_id(),
2194 std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count());
2195
2196 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
2197 request);
2198}
2199
2200boost::future<int>
2201TransactionalQueueImpl::size()
2202{
2203 auto request = protocol::codec::transactionalqueue_size_encode(
2204 get_name(), get_transaction_id(), util::get_current_thread_id());
2205
2206 return invoke_and_get_future<int>(request);
2207}
2208
2209ISetImpl::ISetImpl(const std::string& instance_name,
2210 spi::ClientContext* client_context)
2211 : ProxyImpl(iset::SERVICE_NAME, instance_name, client_context)
2212{
2213 serialization::pimpl::data key_data =
2214 get_context().get_serialization_service().to_data<std::string>(
2215 &instance_name);
2216 partition_id_ = get_partition_id(key_data);
2217}
2218
2219boost::future<bool>
2220ISetImpl::remove_item_listener(boost::uuids::uuid registration_id)
2221{
2222 return get_context().get_client_listener_service().deregister_listener(
2223 registration_id);
2224}
2225
2226boost::future<int>
2227ISetImpl::size()
2228{
2229 auto request = protocol::codec::set_size_encode(get_name());
2230 return invoke_and_get_future<int>(request, partition_id_);
2231}
2232
2233boost::future<bool>
2234ISetImpl::is_empty()
2235{
2236 auto request = protocol::codec::set_isempty_encode(get_name());
2237 return invoke_and_get_future<bool>(request, partition_id_);
2238}
2239
2240boost::future<bool>
2241ISetImpl::contains(const serialization::pimpl::data& element)
2242{
2243 auto request = protocol::codec::set_contains_encode(get_name(), element);
2244 return invoke_and_get_future<bool>(request, partition_id_);
2245}
2246
2247boost::future<std::vector<serialization::pimpl::data>>
2248ISetImpl::to_array_data()
2249{
2250 auto request = protocol::codec::set_getall_encode(get_name());
2251 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
2252 request, partition_id_);
2253}
2254
2255boost::future<bool>
2256ISetImpl::add(const serialization::pimpl::data& element)
2257{
2258 auto request = protocol::codec::set_add_encode(get_name(), element);
2259 return invoke_and_get_future<bool>(request, partition_id_);
2260}
2261
2262boost::future<bool>
2263ISetImpl::remove(const serialization::pimpl::data& element)
2264{
2265 auto request = protocol::codec::set_remove_encode(get_name(), element);
2266 return invoke_and_get_future<bool>(request, partition_id_);
2267}
2268
2269boost::future<bool>
2270ISetImpl::contains_all(const std::vector<serialization::pimpl::data>& elements)
2271{
2272 auto request =
2273 protocol::codec::set_containsall_encode(get_name(), elements);
2274 return invoke_and_get_future<bool>(request, partition_id_);
2275}
2276
2277boost::future<bool>
2278ISetImpl::add_all(const std::vector<serialization::pimpl::data>& elements)
2279{
2280 auto request = protocol::codec::set_addall_encode(get_name(), elements);
2281 return invoke_and_get_future<bool>(request, partition_id_);
2282}
2283
2284boost::future<bool>
2285ISetImpl::remove_all(const std::vector<serialization::pimpl::data>& elements)
2286{
2287 auto request =
2288 protocol::codec::set_compareandremoveall_encode(get_name(), elements);
2289 return invoke_and_get_future<bool>(request, partition_id_);
2290}
2291
2292boost::future<bool>
2293ISetImpl::retain_all(const std::vector<serialization::pimpl::data>& elements)
2294{
2295 auto request =
2296 protocol::codec::set_compareandretainall_encode(get_name(), elements);
2297 return invoke_and_get_future<bool>(request, partition_id_);
2298}
2299
2300boost::future<void>
2301ISetImpl::clear()
2302{
2303 auto request = protocol::codec::set_clear_encode(get_name());
2304 return to_void_future(invoke_on_partition(request, partition_id_));
2305}
2306
2307std::shared_ptr<spi::impl::ListenerMessageCodec>
2308ISetImpl::create_item_listener_codec(bool include_value)
2309{
2310 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2311 new SetListenerMessageCodec(get_name(), include_value));
2312}
2313
2314ISetImpl::SetListenerMessageCodec::SetListenerMessageCodec(std::string name,
2315 bool include_value)
2316 : name_(std::move(name))
2317 , include_value_(include_value)
2318{}
2319
2320protocol::ClientMessage
2321ISetImpl::SetListenerMessageCodec::encode_add_request(bool local_only) const
2322{
2323 return protocol::codec::set_addlistener_encode(
2324 name_, include_value_, local_only);
2325}
2326
2327protocol::ClientMessage
2328ISetImpl::SetListenerMessageCodec::encode_remove_request(
2329 boost::uuids::uuid real_registration_id) const
2330{
2331 return protocol::codec::set_removelistener_encode(name_,
2332 real_registration_id);
2333}
2334
2335ITopicImpl::ITopicImpl(const std::string& instance_name,
2336 spi::ClientContext* context)
2337 : proxy::ProxyImpl("hz:impl:topicService", instance_name, context)
2338 , partition_id_(get_partition_id(to_data(instance_name)))
2339{}
2340
2341boost::future<void>
2342ITopicImpl::publish(const serialization::pimpl::data& data)
2343{
2344 auto request = protocol::codec::topic_publish_encode(get_name(), data);
2345 return to_void_future(invoke_on_partition(request, partition_id_));
2346}
2347
2348boost::future<boost::uuids::uuid>
2349ITopicImpl::add_message_listener(
2350 std::shared_ptr<impl::BaseEventHandler> topic_event_handler)
2351{
2352 return register_listener(create_item_listener_codec(),
2353 std::move(topic_event_handler));
2354}
2355
2356boost::future<bool>
2357ITopicImpl::remove_message_listener(boost::uuids::uuid registration_id)
2358{
2359 return get_context().get_client_listener_service().deregister_listener(
2360 registration_id);
2361}
2362
2363std::shared_ptr<spi::impl::ListenerMessageCodec>
2364ITopicImpl::create_item_listener_codec()
2365{
2366 return std::shared_ptr<spi::impl::ListenerMessageCodec>(
2367 new TopicListenerMessageCodec(get_name()));
2368}
2369
2370ITopicImpl::TopicListenerMessageCodec::TopicListenerMessageCodec(
2371 std::string name)
2372 : name_(std::move(name))
2373{}
2374
2375protocol::ClientMessage
2376ITopicImpl::TopicListenerMessageCodec::encode_add_request(bool local_only) const
2377{
2378 return protocol::codec::topic_addmessagelistener_encode(name_, local_only);
2379}
2380
2381protocol::ClientMessage
2382ITopicImpl::TopicListenerMessageCodec::encode_remove_request(
2383 boost::uuids::uuid real_registration_id) const
2384{
2385 return protocol::codec::topic_removemessagelistener_encode(
2386 name_, real_registration_id);
2387}
2388
2389ReplicatedMapImpl::ReplicatedMapImpl(const std::string& service_name,
2390 const std::string& object_name,
2391 spi::ClientContext* context)
2392 : ProxyImpl(service_name, object_name, context)
2393 , target_partition_id_(-1)
2394{}
2395
2396const int32_t RingbufferImpl::MAX_BATCH_SIZE{ 1000 };
2397} // namespace proxy
2398
2399namespace map {
2400const serialization::pimpl::data&
2401data_entry_view::get_key() const
2402{
2403 return key_;
2404}
2405
2406const serialization::pimpl::data&
2407data_entry_view::get_value() const
2408{
2409 return value_;
2410}
2411
2412int64_t
2413data_entry_view::get_cost() const
2414{
2415 return cost_;
2416}
2417
2418int64_t
2419data_entry_view::get_creation_time() const
2420{
2421 return creation_time_;
2422}
2423
2424int64_t
2425data_entry_view::get_expiration_time() const
2426{
2427 return expiration_time_;
2428}
2429
2430int64_t
2431data_entry_view::get_hits() const
2432{
2433 return hits_;
2434}
2435
2436int64_t
2437data_entry_view::get_last_access_time() const
2438{
2439 return last_access_time_;
2440}
2441
2442int64_t
2443data_entry_view::get_last_stored_time() const
2444{
2445 return last_stored_time_;
2446}
2447
2448int64_t
2449data_entry_view::get_last_update_time() const
2450{
2451 return last_update_time_;
2452}
2453
2454int64_t
2455data_entry_view::get_version() const
2456{
2457 return version_;
2458}
2459
2460int64_t
2461data_entry_view::get_ttl() const
2462{
2463 return ttl_;
2464}
2465
2466int64_t
2467data_entry_view::get_max_idle() const
2468{
2469 return max_idle_;
2470}
2471
2472data_entry_view::data_entry_view(serialization::pimpl::data&& key,
2473 serialization::pimpl::data&& value,
2474 int64_t cost,
2475 int64_t creation_time,
2476 int64_t expiration_time,
2477 int64_t hits,
2478 int64_t last_access_time,
2479 int64_t last_stored_time,
2480 int64_t last_update_time,
2481 int64_t version,
2482 int64_t ttl,
2483 int64_t max_idle)
2484 : key_(std::move(key))
2485 , value_(std::move(value))
2486 , cost_(cost)
2487 , creation_time_(creation_time)
2488 , expiration_time_(expiration_time)
2489 , hits_(hits)
2490 , last_access_time_(last_access_time)
2491 , last_stored_time_(last_stored_time)
2492 , last_update_time_(last_update_time)
2493 , version_(version)
2494 , ttl_(ttl)
2495 , max_idle_(max_idle)
2496{}
2497} // namespace map
2498
2499namespace topic {
2500namespace impl {
2501namespace reliable {
2502ReliableTopicMessage::ReliableTopicMessage()
2503 : publish_time_(std::chrono::system_clock::now())
2504{}
2505
2506ReliableTopicMessage::ReliableTopicMessage(
2507 hazelcast::client::serialization::pimpl::data&& payload_data,
2508 std::unique_ptr<address> address)
2509 : publish_time_(std::chrono::system_clock::now())
2510 , payload_(std::move(payload_data))
2511{
2512 if (address) {
2513 publisher_address_ = boost::make_optional(*address);
2514 }
2515}
2516
2517std::chrono::system_clock::time_point
2518ReliableTopicMessage::get_publish_time() const
2519{
2520 return publish_time_;
2521}
2522
2523const boost::optional<address>&
2524ReliableTopicMessage::get_publisher_address() const
2525{
2526 return publisher_address_;
2527}
2528
2529serialization::pimpl::data&
2530ReliableTopicMessage::get_payload()
2531{
2532 return payload_;
2533}
2534} // namespace reliable
2535} // namespace impl
2536} // namespace topic
2537
2538namespace serialization {
2539int32_t
2540hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_factory_id()
2541{
2542 return F_ID;
2543}
2544
2545int
2546hz_serializer<topic::impl::reliable::ReliableTopicMessage>::get_class_id()
2547{
2548 return RELIABLE_TOPIC_MESSAGE;
2549}
2550
2551void
2552hz_serializer<topic::impl::reliable::ReliableTopicMessage>::write_data(
2553 const topic::impl::reliable::ReliableTopicMessage& object,
2554 object_data_output& out)
2555{
2556 out.write<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
2557 object.publish_time_.time_since_epoch())
2558 .count());
2559 out.write_object(object.publisher_address_);
2560 out.write(object.payload_.to_byte_array());
2561}
2562
2563topic::impl::reliable::ReliableTopicMessage
2564hz_serializer<topic::impl::reliable::ReliableTopicMessage>::read_data(
2565 object_data_input& in)
2566{
2567 topic::impl::reliable::ReliableTopicMessage message;
2568 auto now = std::chrono::system_clock::now();
2569 message.publish_time_ = now +
2570 std::chrono::milliseconds(in.read<int64_t>()) -
2571 now.time_since_epoch();
2572 message.publisher_address_ = in.read_object<address>();
2573 message.payload_ =
2574 serialization::pimpl::data(in.read<std::vector<byte>>().value());
2575 return message;
2576}
2577} // namespace serialization
2578
2579entry_event::entry_event(const std::string& name,
2580 member&& member,
2581 type event_type,
2582 typed_data&& key,
2583 typed_data&& value,
2584 typed_data&& old_value,
2585 typed_data&& merging_value)
2586 : name_(name)
2587 , member_(std::move(member))
2588 , event_type_(event_type)
2589 , key_(std::move(key))
2590 , value_(std::move(value))
2591 , old_value_(std::move(old_value))
2592 , merging_value_(std::move(merging_value))
2593{}
2594
2595const typed_data&
2597{
2598 return key_;
2599}
2600
2601const typed_data&
2603{
2604 return old_value_;
2605}
2606
2607const typed_data&
2609{
2610 return value_;
2611}
2612
2613const typed_data&
2615{
2616 return merging_value_;
2617}
2618
2619const member&
2621{
2622 return member_;
2623}
2624
2625entry_event::type
2627{
2628 return event_type_;
2629}
2630
2631const std::string&
2633{
2634 return name_;
2635}
2636
2637std::ostream&
2638operator<<(std::ostream& os, const entry_event& event)
2639{
2640 os << "name: " << event.name_ << " member: " << event.member_
2641 << " eventType: " << static_cast<int>(event.event_type_)
2642 << " key: " << event.key_.get_type()
2643 << " value: " << event.value_.get_type()
2644 << " oldValue: " << event.old_value_.get_type()
2645 << " mergingValue: " << event.merging_value_.get_type();
2646 return os;
2647}
2648
2650 entry_event::type event_type,
2651 const std::string& name,
2652 int number_of_entries_affected)
2653 : member_(member)
2654 , event_type_(event_type)
2655 , name_(name)
2656 , number_of_entries_affected_(number_of_entries_affected)
2657{}
2658
2659const member&
2661{
2662 return member_;
2663}
2664
2665entry_event::type
2667{
2668 return event_type_;
2669}
2670
2671const std::string&
2673{
2674 return name_;
2675}
2676
2677int
2679{
2680 return number_of_entries_affected_;
2681}
2682
2683std::ostream&
2684operator<<(std::ostream& os, const map_event& event)
2685{
2686 os << "MapEvent{member: " << event.member_
2687 << " eventType: " << static_cast<int>(event.event_type_)
2688 << " name: " << event.name_
2689 << " numberOfEntriesAffected: " << event.number_of_entries_affected_;
2690 return os;
2691}
2692
2693item_event_base::item_event_base(const std::string& name,
2694 const member& member,
2695 const item_event_type& event_type)
2696 : name_(name)
2697 , member_(member)
2698 , event_type_(event_type)
2699{}
2700
2701const member&
2703{
2704 return member_;
2705}
2706
2707item_event_type
2709{
2710 return event_type_;
2711}
2712
2713const std::string&
2715{
2716 return name_;
2717}
2718
2719item_event_base::~item_event_base() = default;
2720
2721flake_id_generator::flake_id_generator(const std::string& object_name,
2722 spi::ClientContext* context)
2723 : flake_id_generator_impl(SERVICE_NAME, object_name, context)
2724{}
2725} // namespace client
2726} // namespace hazelcast
const typed_data & get_key() const
Returns the key of the entry event.
Definition proxy.cpp:2596
const std::string & get_name() const
Returns the name of the map for this event.
Definition proxy.cpp:2632
const typed_data & get_old_value() const
Returns the old value of the entry event.
Definition proxy.cpp:2602
type get_event_type() const
Return the event type.
Definition proxy.cpp:2626
const member & get_member() const
Returns the member fired this event.
Definition proxy.cpp:2620
const typed_data & get_value() const
Returns the value of the entry event.
Definition proxy.cpp:2608
const typed_data & get_merging_value() const
Returns the incoming merging value of the entry event.
Definition proxy.cpp:2614
static const std::chrono::milliseconds UNSET
Default TTL value of a record.
Definition imap.h:1136
item_event_type get_event_type() const
Return the event type.
Definition proxy.cpp:2708
const member & get_member() const
Returns the member fired this event.
Definition proxy.cpp:2702
const std::string & get_name() const
Returns the name of the collection for this event.
Definition proxy.cpp:2714
Map events common contract.
Definition map_event.h:36
const std::string & get_name() const
Returns the name of the map for this event.
Definition proxy.cpp:2672
entry_event::type get_event_type() const
Return the event type.
Definition proxy.cpp:2666
int get_number_of_entries_affected() const
Returns the number of entries affected by this event.
Definition proxy.cpp:2678
map_event(member &&member, entry_event::type event_type, const std::string &name, int number_of_entries_affected)
Constructor.
Definition proxy.cpp:2649
const member & get_member() const
Returns the member fired this event.
Definition proxy.cpp:2660
hz_cluster member class.
Definition member.h:62
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
bool remove_message_listener(const std::string &registration_id)
Stops receiving messages for the given message listener.
Definition proxy.cpp:66
reliable_listener(bool loss_tolerant, int64_t initial_sequence_id=-1)
Definition proxy.cpp:103
typed_data class is a wrapper class for the serialized binary data.
serialization::pimpl::object_type get_type() const
STL namespace.