22 #include "hazelcast/client/proxy/ReliableTopicImpl.h" 23 #include "hazelcast/client/ringbuffer.h" 24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h" 25 #include "hazelcast/util/Preconditions.h" 26 #include "hazelcast/util/concurrent/Cancellable.h" 27 #include "hazelcast/logger.h" 28 #include "hazelcast/client/topic/impl/reliable/ReliableTopicMessage.h" 29 #include "hazelcast/client/topic/impl/reliable/ReliableTopicExecutor.h" 31 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64) 33 #pragma warning(disable: 4251) //for dll export 52 friend class spi::ProxyManager;
55 static constexpr
const char *SERVICE_NAME =
"hz:impl:topicService";
66 boost::future<void>
publish(
const E &message) {
67 return proxy::ReliableTopicImpl::publish(to_data(message));
87 template<
typename Listener>
89 int id = ++runner_counter_;
90 std::shared_ptr<MessageRunner < Listener>>
91 runner(
new MessageRunner<Listener>(
id, std::forward<Listener>(listener), ringbuffer_, get_name(),
92 get_serialization_service(), config_, logger_));
93 runners_map_.put(
id, runner);
95 return std::to_string(
id);
106 bool remove_message_listener(
const std::string ®istration_id);
108 void on_destroy()
override;
111 reliable_topic(std::shared_ptr<ringbuffer> rb,
const std::string &instance_name,
112 spi::ClientContext *context);
114 template<
typename Listener>
117 public std::enable_shared_from_this<MessageRunner<Listener>>,
118 public util::concurrent::Cancellable {
120 MessageRunner(
int id, Listener &&listener,
const std::shared_ptr<ringbuffer> &rb,
121 const std::string &topic_name, serialization::pimpl::SerializationService &service,
123 : listener_(listener), id_(id), ringbuffer_(rb), cancelled_(false), logger_(lg),
124 name_(topic_name), executor_(rb, lg), serialization_service_(service),
125 config_(reliable_topic_config) {
127 int64_t initialSequence = listener.initial_sequence_id_;
128 if (initialSequence == -1) {
129 initialSequence = ringbuffer_->tail_sequence().get() + 1;
131 sequence_ = initialSequence;
134 ~MessageRunner()
override =
default;
141 topic::impl::reliable::ReliableTopicExecutor::Message m;
142 m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
143 m.callback = this->shared_from_this();
144 m.sequence = sequence_;
145 m.max_count = config_.get_read_batch_size();
146 executor_.execute(std::move(m));
150 void on_response(
const boost::optional<rb::read_result_set> &all_messages)
override {
157 for (
auto &item : all_messages->get_items()) {
159 listener_.store_sequence_id_(sequence_);
160 process(item.get<topic::impl::reliable::ReliableTopicMessage>().get_ptr());
161 }
catch (exception::iexception &e) {
175 void on_failure(std::exception_ptr throwable)
override {
181 std::rethrow_exception(throwable);
182 }
catch (exception::iexception &ie) {
183 int32_t err = ie.get_error_code();
184 int32_t causeErrorCode = protocol::UNDEFINED;
186 std::rethrow_if_nested(ie);
187 }
catch (exception::iexception &causeException) {
188 causeErrorCode = causeException.get_error_code();
190 if (protocol::TIMEOUT == err) {
191 HZ_LOG(logger_, finest,
192 boost::str(boost::format(
"MessageListener on topic: %1% timed out. " 193 "Continuing from last known sequence: %2%")
198 }
else if (protocol::EXECUTION == err &&
199 protocol::STALE_SEQUENCE == causeErrorCode) {
201 int64_t remoteHeadSeq = ringbuffer_->head_sequence().get();
203 if (listener_.loss_tolerant_) {
204 HZ_LOG(logger_, finest,
205 boost::str(boost::format(
"MessageListener %1% on topic: %2% " 206 "ran into a stale sequence. " 207 "Jumping from old sequence: %3% " 209 % id_ % name_ % sequence_ % remoteHeadSeq)
211 sequence_ = remoteHeadSeq;
216 HZ_LOG(logger_, warning,
217 boost::str(boost::format(
"Terminating MessageListener: %1% on topic: %2%" 218 "Reason: The listener was too slow or the retention " 219 "period of the message has been violated. " 220 "head: %3% sequence: %4%")
221 % id_ % name_ % remoteHeadSeq % sequence_)
223 }
else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
224 HZ_LOG(logger_, finest,
225 boost::str(boost::format(
"Terminating MessageListener %1% on topic: %2%. " 226 "Reason: HazelcastInstance is shutting down")
229 }
else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
230 HZ_LOG(logger_, finest,
231 boost::str(boost::format(
"Terminating MessageListener %1% on topic: %2%. " 232 "Reason: Topic is destroyed")
236 HZ_LOG(logger_, warning,
237 boost::str(boost::format(
"Terminating MessageListener %1% on topic: %2%. " 238 "Reason: Unhandled exception, details: %3%")
239 % id_ % name_ % ie.what())
247 bool cancel()
override {
248 cancelled_.store(
true);
249 return executor_.stop();
252 bool is_cancelled()
override {
253 return cancelled_.load();
256 void process(topic::impl::reliable::ReliableTopicMessage *message) {
258 listener_.received_(to_message(message));
261 topic::message to_message(topic::impl::reliable::ReliableTopicMessage *message) {
262 boost::optional<member> m;
263 auto &addr = message->get_publisher_address();
264 if (addr.has_value()) {
265 m = boost::make_optional<member>(addr.value());
267 return topic::message(name_, typed_data(std::move(message->get_payload()), serialization_service_),
268 message->get_publish_time(), std::move(m));
271 bool terminate(
const exception::iexception &failure) {
277 bool terminate = listener_.terminal_(failure);
279 HZ_LOG(logger_, warning,
280 boost::str(boost::format(
"Terminating ReliableListener %1% " 282 "Reason: Unhandled exception, details: %3%")
283 % id_ % name_ % failure.what())
286 HZ_LOG(logger_, finest,
287 boost::str(boost::format(
"ReliableListener %1% on topic: %2% " 288 "ran into an exception, details: %3%")
289 % id_ % name_ % failure.what())
293 }
catch (exception::iexception &t) {
294 HZ_LOG(logger_, warning,
295 boost::str(boost::format(
"Terminating ReliableListener %1% on topic: %2%. " 296 "Reason: Unhandled exception while calling the function set by " 297 "ReliableListener::terminate_on_exception. %3%")
298 % id_ % name_ % t.what())
307 std::shared_ptr<ringbuffer> ringbuffer_;
309 std::atomic<bool> cancelled_;
311 const std::string &name_;
312 topic::impl::reliable::ReliableTopicExecutor executor_;
313 serialization::pimpl::SerializationService &serialization_service_;
314 const config::reliable_topic_config &config_;
317 util::SynchronizedMap<int, util::concurrent::Cancellable> runners_map_;
318 std::atomic<int> runner_counter_{ 0 };
323 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64) Definition: hazelcast_client.h:62
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: reliable_topic.h:51
boost::future< void > publish(const E &message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: reliable_topic.h:66
std::string add_message_listener(Listener &&listener)
Subscribes to this topic.
Definition: reliable_topic.h:88
execution_callback allows to asynchronously get notified when the execution is completed, either successfully or with error.
Definition: execution_callback.h:40
Definition: reliable_topic_config.h:29