22 #include "hazelcast/client/ringbuffer.h"
23 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
24 #include "hazelcast/util/Preconditions.h"
25 #include "hazelcast/util/concurrent/Cancellable.h"
26 #include "hazelcast/logger.h"
27 #include "hazelcast/client/topic/impl/reliable/ReliableTopicMessage.h"
29 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
31 #pragma warning(disable : 4251)
54 friend class spi::ProxyManager;
58 static constexpr
const char* SERVICE_NAME =
"hz:impl:reliableTopicService";
70 boost::future<void>
publish(
const E& message)
72 topic::impl::reliable::ReliableTopicMessage reliable_message(
73 to_data(message),
nullptr);
74 return to_void_future(ringbuffer_.get()->add(reliable_message));
95 template<
typename Listener>
98 int id = ++runner_counter_;
99 std::shared_ptr<MessageRunner<Listener>> runner(
100 new MessageRunner<Listener>(
id,
101 std::forward<Listener>(listener),
104 get_serialization_service(),
109 runners_map_.put(
id, runner);
111 return std::to_string(
id);
122 bool remove_message_listener(
const std::string& registration_id);
125 void on_shutdown()
override;
127 void on_destroy()
override;
129 void post_destroy()
override;
132 static constexpr
const char* TOPIC_RB_PREFIX =
"_hz_rb_";
135 spi::ClientContext* context);
137 template<
typename Listener>
139 :
public std::enable_shared_from_this<MessageRunner<Listener>>
140 ,
public util::concurrent::Cancellable
143 MessageRunner(
int id,
145 const std::shared_ptr<ringbuffer>& rb,
146 const std::string& topic_name,
147 serialization::pimpl::SerializationService& service,
150 util::hz_thread_pool& executor,
151 util::SynchronizedMap<int, util::concurrent::Cancellable>&
153 : listener_(listener)
159 , executor_(executor)
160 , serialization_service_(service)
161 , batch_size_(batch_size)
162 , runners_map_(runners_map)
166 int64_t initialSequence = listener.initial_sequence_id_;
167 if (initialSequence == -1) {
168 initialSequence = ringbuffer_->tail_sequence().get() + 1;
170 sequence_ = initialSequence;
173 virtual ~MessageRunner() =
default;
181 auto runner = this->shared_from_this();
182 ringbuffer_->read_many(sequence_, 1, batch_size_)
183 .then(executor_, [runner](boost::future<rb::read_result_set> f) {
184 if (runner->cancelled_) {
189 auto result = f.get();
195 result.get_next_sequence_to_read_from() -
196 result.read_count() - runner->sequence_;
197 if (lost_count != 0 &&
198 !runner->listener_.loss_tolerant_) {
203 auto const& items = result.get_items();
204 for (
size_t i = 0; i < items.size(); i++) {
205 auto const& message = items[i];
207 runner->listener_.store_sequence_id_(
208 result.get_sequence(
static_cast<int>(i)));
209 auto rel_msg = message.get<
210 topic::impl::reliable::ReliableTopicMessage>();
211 runner->process(*rel_msg);
212 }
catch (exception::iexception& e) {
213 if (runner->terminate(e)) {
221 result.get_next_sequence_to_read_from();
223 }
catch (exception::iexception& ie) {
224 if (runner->handle_internal_exception(ie)) {
233 bool handle_operation_timeout_exception()
238 boost::format(
"MessageListener on topic: %1% timed out. "
239 "Continuing from last known sequence: %2%") %
255 bool handle_illegal_argument_exception(exception::iexception& e)
259 int64_t remoteHeadSeq = ringbuffer_->head_sequence().get();
261 if (listener_.loss_tolerant_) {
266 "Terminating MessageListener: %1% on topic: %2% ."
267 "Reason: Underlying ring buffer data related to "
268 "reliable topic is lost.") %
269 id_ % name_ % e.what() % sequence_ % remoteHeadSeq));
270 sequence_ = remoteHeadSeq;
277 boost::str(boost::format(
278 "Terminating MessageListener: %1% on topic: %2%"
279 "Reason: The listener was too slow or the retention "
280 "period of the message has been violated. "
281 "head: %3% sequence: %4%") %
282 id_ % name_ % remoteHeadSeq % sequence_));
292 bool handle_internal_exception(exception::iexception& ie)
294 int32_t err = ie.get_error_code();
297 case protocol::TIMEOUT:
298 return handle_operation_timeout_exception();
299 case protocol::ILLEGAL_ARGUMENT:
300 return handle_illegal_argument_exception(ie);
301 case protocol::HAZELCAST_INSTANCE_NOT_ACTIVE:
306 "Terminating MessageListener %1% on topic: %2%. "
307 "Reason: HazelcastInstance is shutting down") %
309 case protocol::DISTRIBUTED_OBJECT_DESTROYED:
314 "Terminating MessageListener %1% on topic: %2%. "
315 "Reason: Topic is destroyed") %
322 "Terminating MessageListener %1% on topic: %2%. "
323 "Reason: Unhandled exception, details: %3%") %
324 id_ % name_ % ie.what()));
329 bool cancel()
override
331 cancelled_.store(
true);
332 runners_map_.remove(id_);
336 bool is_cancelled()
override {
return cancelled_.load(); }
339 void process(topic::impl::reliable::ReliableTopicMessage& message)
341 listener_.received_(to_message(message));
344 topic::message to_message(
345 topic::impl::reliable::ReliableTopicMessage& message)
347 boost::optional<member> m;
348 auto& addr = message.get_publisher_address();
349 if (addr.has_value()) {
350 m = boost::make_optional<member>(addr.value());
352 return topic::message(name_,
353 typed_data(std::move(message.get_payload()),
354 serialization_service_),
355 message.get_publish_time(),
359 bool terminate(
const exception::iexception& failure)
366 bool terminate = listener_.terminal_(failure);
372 "Terminating ReliableListener %1% "
374 "Reason: Unhandled exception, details: %3%") %
375 id_ % name_ % failure.what()));
379 boost::str(boost::format(
380 "ReliableListener %1% on topic: %2% "
381 "ran into an exception, details: %3%") %
382 id_ % name_ % failure.what()));
385 }
catch (exception::iexception& t) {
390 "Terminating ReliableListener %1% on topic: %2%. "
391 "Reason: Unhandled exception while calling the "
393 "ReliableListener::terminate_on_exception. %3%") %
394 id_ % name_ % t.what()));
402 std::shared_ptr<ringbuffer> ringbuffer_;
404 std::atomic<bool> cancelled_;
406 const std::string& name_;
407 util::hz_thread_pool& executor_;
408 serialization::pimpl::SerializationService& serialization_service_;
410 util::SynchronizedMap<int, util::concurrent::Cancellable>& runners_map_;
413 util::SynchronizedMap<int, util::concurrent::Cancellable> runners_map_;
414 std::atomic<int> runner_counter_{ 0 };
415 util::hz_thread_pool& executor_;
418 boost::shared_future<std::shared_ptr<ringbuffer>> ringbuffer_;
423 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
std::string add_message_listener(Listener &&listener)
Subscribes to this topic.
boost::future< void > publish(const E &message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...