52class HAZELCAST_API reliable_topic
53 :
public proxy::ProxyImpl
54 ,
public std::enable_shared_from_this<reliable_topic>
56 friend class spi::ProxyManager;
57 friend class hazelcast_client;
60 static constexpr const char* SERVICE_NAME =
"hz:impl:reliableTopicService";
72 boost::future<void>
publish(
const E& message)
74 topic::impl::reliable::ReliableTopicMessage reliable_message(
75 to_data(message),
nullptr);
76 return to_void_future(ringbuffer_.get()->add(reliable_message));
97 template<
typename Listener>
100 int id = ++runner_counter_;
101 std::shared_ptr<MessageRunner<Listener>> runner(
102 new MessageRunner<Listener>(
id,
103 std::forward<Listener>(listener),
106 get_serialization_service(),
111 shared_from_this()));
112 runners_map_.put(
id, runner);
114 return std::to_string(
id);
125 bool remove_message_listener(
const std::string& registration_id);
128 void on_shutdown()
override;
130 void on_destroy()
override;
132 void post_destroy()
override;
135 static constexpr const char* TOPIC_RB_PREFIX =
"_hz_rb_";
138 spi::ClientContext* context);
140 template<
typename Listener>
142 :
public std::enable_shared_from_this<MessageRunner<Listener>>
143 ,
public util::concurrent::Cancellable
146 MessageRunner(
int id,
148 const std::shared_ptr<ringbuffer>& rb,
149 const std::string& topic_name,
150 serialization::pimpl::SerializationService& service,
153 std::shared_ptr<spi::impl::ClientExecutionServiceImpl>
155 util::hz_thread_pool& executor,
156 std::weak_ptr<reliable_topic> topic)
157 : listener_(listener)
163 , execution_service_(
std::move(execution_service))
164 , executor_(executor)
165 , serialization_service_(service)
166 , batch_size_(batch_size)
167 , topic_(
std::move(topic))
171 int64_t initialSequence = listener.initial_sequence_id_;
172 if (initialSequence == -1) {
173 initialSequence = ringbuffer_->tail_sequence().get() + 1;
175 sequence_ = initialSequence;
178 virtual ~MessageRunner() =
default;
186 auto runner = this->shared_from_this();
187 auto execution_service = this->execution_service_;
188 ringbuffer_->read_many(sequence_, 1, batch_size_)
189 .then(executor_, [runner,execution_service](boost::future<rb::read_result_set> f) {
190 if (runner->cancelled_) {
195 auto result = f.get();
201 result.get_next_sequence_to_read_from() -
202 result.read_count() - runner->sequence_;
203 if (lost_count != 0 &&
204 !runner->listener_.loss_tolerant_) {
209 auto const& items = result.get_items();
210 for (
size_t i = 0; i < items.size(); i++) {
211 auto const& message = items[i];
213 runner->listener_.store_sequence_id_(
214 result.get_sequence(
static_cast<int>(i)));
215 auto rel_msg = message.get<
216 topic::impl::reliable::ReliableTopicMessage>();
217 runner->process(*rel_msg);
218 }
catch (exception::iexception& e) {
219 if (runner->terminate(e)) {
227 result.get_next_sequence_to_read_from();
229 }
catch (exception::iexception& ie) {
230 if (runner->handle_internal_exception(ie)) {
239 bool handle_operation_timeout_exception()
244 boost::format(
"MessageListener on topic: %1% timed out. "
245 "Continuing from last known sequence: %2%") %
261 bool handle_illegal_argument_exception(exception::iexception& e)
265 int64_t remoteHeadSeq = ringbuffer_->head_sequence().get();
267 if (listener_.loss_tolerant_) {
272 "Terminating MessageListener: %1% on topic: %2% ."
273 "Reason: Underlying ring buffer data related to "
274 "reliable topic is lost.") %
275 id_ % name_ % e.what() % sequence_ % remoteHeadSeq));
276 sequence_ = remoteHeadSeq;
283 boost::str(boost::format(
284 "Terminating MessageListener: %1% on topic: %2%"
285 "Reason: The listener was too slow or the retention "
286 "period of the message has been violated. "
287 "head: %3% sequence: %4%") %
288 id_ % name_ % remoteHeadSeq % sequence_));
298 bool handle_internal_exception(exception::iexception& ie)
300 int32_t err = ie.get_error_code();
303 case protocol::TIMEOUT:
304 return handle_operation_timeout_exception();
305 case protocol::ILLEGAL_ARGUMENT:
306 return handle_illegal_argument_exception(ie);
307 case protocol::HAZELCAST_INSTANCE_NOT_ACTIVE:
312 "Terminating MessageListener %1% on topic: %2%. "
313 "Reason: HazelcastInstance is shutting down") %
316 case protocol::DISTRIBUTED_OBJECT_DESTROYED:
321 "Terminating MessageListener %1% on topic: %2%. "
322 "Reason: Topic is destroyed") %
330 "Terminating MessageListener %1% on topic: %2%. "
331 "Reason: Unhandled exception, details: %3%") %
332 id_ % name_ % ie.what()));
337 bool cancel()
override
339 cancelled_.store(
true);
340 auto topic_ptr = topic_.lock();
342 topic_ptr->runners_map_.remove(id_);
344 listener_.on_cancel_();
348 bool is_cancelled()
override {
return cancelled_.load(); }
351 void process(topic::impl::reliable::ReliableTopicMessage& message)
353 listener_.received_(to_message(message));
356 topic::message to_message(
357 topic::impl::reliable::ReliableTopicMessage& message)
359 boost::optional<member> m;
360 auto& addr = message.get_publisher_address();
361 if (addr.has_value()) {
362 m = boost::make_optional<member>(addr.value());
364 return topic::message(name_,
365 typed_data(std::move(message.get_payload()),
366 serialization_service_),
367 message.get_publish_time(),
371 bool terminate(
const exception::iexception& failure)
378 bool terminate = listener_.terminal_(failure);
384 "Terminating ReliableListener %1% "
386 "Reason: Unhandled exception, details: %3%") %
387 id_ % name_ % failure.what()));
391 boost::str(boost::format(
392 "ReliableListener %1% on topic: %2% "
393 "ran into an exception, details: %3%") %
394 id_ % name_ % failure.what()));
397 }
catch (exception::iexception& t) {
402 "Terminating ReliableListener %1% on topic: %2%. "
403 "Reason: Unhandled exception while calling the "
405 "ReliableListener::terminate_on_exception. %3%") %
406 id_ % name_ % t.what()));
414 std::shared_ptr<ringbuffer> ringbuffer_;
416 std::atomic<bool> cancelled_;
418 const std::string& name_;
419 std::shared_ptr<spi::impl::ClientExecutionServiceImpl>
421 util::hz_thread_pool& executor_;
422 serialization::pimpl::SerializationService& serialization_service_;
424 std::weak_ptr<reliable_topic> topic_;
427 util::SynchronizedMap<int, util::concurrent::Cancellable> runners_map_;
428 std::atomic<int> runner_counter_{ 0 };
429 std::shared_ptr<spi::impl::ClientExecutionServiceImpl> execution_service_;
430 util::hz_thread_pool& executor_;
433 boost::shared_future<std::shared_ptr<ringbuffer>> ringbuffer_;