22 #include "hazelcast/client/ringbuffer.h"
23 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
24 #include "hazelcast/util/Preconditions.h"
25 #include "hazelcast/util/concurrent/Cancellable.h"
26 #include "hazelcast/logger.h"
27 #include "hazelcast/client/topic/impl/reliable/ReliableTopicMessage.h"
29 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
31 #pragma warning(disable: 4251)
50 friend class spi::ProxyManager;
53 static constexpr
const char *SERVICE_NAME =
"hz:impl:reliableTopicService";
64 boost::future<void>
publish(
const E &message) {
65 topic::impl::reliable::ReliableTopicMessage reliable_message(to_data(message),
nullptr);
66 return to_void_future(ringbuffer_.get()->add(reliable_message));
86 template<
typename Listener>
88 int id = ++runner_counter_;
89 std::shared_ptr<MessageRunner < Listener>>
90 runner(
new MessageRunner<Listener>(
id, std::forward<Listener>(listener), ringbuffer_.get(), get_name(),
91 get_serialization_service(), batch_size_, logger_, executor_,
93 runners_map_.put(
id, runner);
95 return std::to_string(
id);
106 bool remove_message_listener(
const std::string ®istration_id);
108 void on_shutdown()
override;
110 void on_destroy()
override;
112 void post_destroy()
override;
115 static constexpr
const char *TOPIC_RB_PREFIX =
"_hz_rb_";
117 reliable_topic(
const std::string &instance_name, spi::ClientContext *context);
119 template<
typename Listener>
121 :
public std::enable_shared_from_this<MessageRunner<Listener>>,
122 public util::concurrent::Cancellable {
124 MessageRunner(
int id, Listener &&listener,
const std::shared_ptr<ringbuffer> &rb,
125 const std::string &topic_name, serialization::pimpl::SerializationService &service,
126 int batch_size,
logger &lg, util::hz_thread_pool &executor,
127 util::SynchronizedMap<int, util::concurrent::Cancellable> &runners_map)
128 : listener_(listener), id_(id), ringbuffer_(rb), cancelled_(false), logger_(lg),
129 name_(topic_name), executor_(executor), serialization_service_(service),
130 batch_size_(batch_size), runners_map_(runners_map) {
132 int64_t initialSequence = listener.initial_sequence_id_;
133 if (initialSequence == -1) {
134 initialSequence = ringbuffer_->tail_sequence().get() + 1;
136 sequence_ = initialSequence;
139 virtual ~MessageRunner() =
default;
146 auto runner = this->shared_from_this();
147 ringbuffer_->read_many(sequence_, 1, batch_size_).then(executor_, [runner](
148 boost::future<rb::read_result_set> f) {
149 if (runner->cancelled_) {
154 auto result = f.get();
158 auto lost_count = result.get_next_sequence_to_read_from() - result.read_count() - runner->sequence_;
159 if (lost_count != 0 && !runner->listener_.loss_tolerant_) {
164 auto const &items = result.get_items();
165 for (
size_t i = 0; i < items.size(); i++) {
166 auto const &message = items[i];
168 runner->listener_.store_sequence_id_(result.get_sequence(
static_cast<int>(i)));
169 auto rel_msg = message.get<topic::impl::reliable::ReliableTopicMessage>();
170 runner->process(*rel_msg);
171 }
catch (exception::iexception &e) {
172 if (runner->terminate(e)) {
179 runner->sequence_ = result.get_next_sequence_to_read_from();
181 }
catch (exception::iexception &ie) {
182 if (runner->handle_internal_exception(ie)) {
192 bool handle_operation_timeout_exception() {
193 HZ_LOG(logger_, finest,
194 boost::str(boost::format(
"MessageListener on topic: %1% timed out. "
195 "Continuing from last known sequence: %2%")
211 bool handle_illegal_argument_exception(exception::iexception &e) {
213 int64_t remoteHeadSeq = ringbuffer_->head_sequence().get();
215 if (listener_.loss_tolerant_) {
216 HZ_LOG(logger_, finest,
217 boost::str(boost::format(
"Terminating MessageListener: %1% on topic: %2% ."
218 "Reason: Underlying ring buffer data related to reliable topic is lost.")
219 % id_ % name_ %e.what() % sequence_ % remoteHeadSeq)
221 sequence_ = remoteHeadSeq;
225 HZ_LOG(logger_, warning,
226 boost::str(boost::format(
"Terminating MessageListener: %1% on topic: %2%"
227 "Reason: The listener was too slow or the retention "
228 "period of the message has been violated. "
229 "head: %3% sequence: %4%")
230 % id_ % name_ % remoteHeadSeq % sequence_)
239 bool handle_internal_exception(exception::iexception &ie) {
240 int32_t err = ie.get_error_code();
243 case protocol::TIMEOUT:
244 return handle_operation_timeout_exception();
245 case protocol::ILLEGAL_ARGUMENT:
246 return handle_illegal_argument_exception(ie);
247 case protocol::HAZELCAST_INSTANCE_NOT_ACTIVE:
248 HZ_LOG(logger_, finest,
249 boost::str(boost::format(
"Terminating MessageListener %1% on topic: %2%. "
250 "Reason: HazelcastInstance is shutting down")
253 case protocol::DISTRIBUTED_OBJECT_DESTROYED:
254 HZ_LOG(logger_, finest,
255 boost::str(boost::format(
"Terminating MessageListener %1% on topic: %2%. "
256 "Reason: Topic is destroyed")
260 HZ_LOG(logger_, warning,
261 boost::str(boost::format(
"Terminating MessageListener %1% on topic: %2%. "
262 "Reason: Unhandled exception, details: %3%")
263 % id_ % name_ % ie.what())
269 bool cancel()
override {
270 cancelled_.store(
true);
271 runners_map_.remove(id_);
275 bool is_cancelled()
override {
276 return cancelled_.load();
279 void process(topic::impl::reliable::ReliableTopicMessage &message) {
280 listener_.received_(to_message(message));
283 topic::message to_message(topic::impl::reliable::ReliableTopicMessage &message) {
284 boost::optional<member> m;
285 auto &addr = message.get_publisher_address();
286 if (addr.has_value()) {
287 m = boost::make_optional<member>(addr.value());
289 return topic::message(name_, typed_data(std::move(message.get_payload()), serialization_service_),
290 message.get_publish_time(), std::move(m));
293 bool terminate(
const exception::iexception &failure) {
299 bool terminate = listener_.terminal_(failure);
301 HZ_LOG(logger_, warning,
302 boost::str(boost::format(
"Terminating ReliableListener %1% "
304 "Reason: Unhandled exception, details: %3%")
305 % id_ % name_ % failure.what())
308 HZ_LOG(logger_, finest,
309 boost::str(boost::format(
"ReliableListener %1% on topic: %2% "
310 "ran into an exception, details: %3%")
311 % id_ % name_ % failure.what())
315 }
catch (exception::iexception &t) {
316 HZ_LOG(logger_, warning,
317 boost::str(boost::format(
"Terminating ReliableListener %1% on topic: %2%. "
318 "Reason: Unhandled exception while calling the function set by "
319 "ReliableListener::terminate_on_exception. %3%")
320 % id_ % name_ % t.what())
329 std::shared_ptr<ringbuffer> ringbuffer_;
331 std::atomic<bool> cancelled_;
333 const std::string &name_;
334 util::hz_thread_pool &executor_;
335 serialization::pimpl::SerializationService &serialization_service_;
337 util::SynchronizedMap<int, util::concurrent::Cancellable> &runners_map_;
340 util::SynchronizedMap<int, util::concurrent::Cancellable> runners_map_;
341 std::atomic<int> runner_counter_{ 0 };
342 util::hz_thread_pool &executor_;
345 boost::shared_future<std::shared_ptr<ringbuffer>> ringbuffer_;
350 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
std::string add_message_listener(Listener &&listener)
Subscribes to this topic.
boost::future< void > publish(const E &message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...