Hazelcast C++ Client
reliable_topic.h
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <string>
19 #include <memory>
20 #include <atomic>
21 
22 #include "hazelcast/client/proxy/ReliableTopicImpl.h"
23 #include "hazelcast/client/ringbuffer.h"
24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
25 #include "hazelcast/util/Preconditions.h"
26 #include "hazelcast/util/concurrent/Cancellable.h"
27 #include "hazelcast/logger.h"
28 #include "hazelcast/client/topic/impl/reliable/ReliableTopicMessage.h"
29 #include "hazelcast/client/topic/impl/reliable/ReliableTopicExecutor.h"
30 
31 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
32 #pragma warning(push)
33 #pragma warning(disable: 4251) //for dll export
34 #endif
35 
36 namespace hazelcast {
37  namespace client {
38 
51  class HAZELCAST_API reliable_topic : public proxy::ReliableTopicImpl {
52  friend class spi::ProxyManager;
53  friend class hazelcast_client;
54  public:
55  static constexpr const char *SERVICE_NAME = "hz:impl:topicService";
56 
65  template<typename E>
66  boost::future<void> publish(const E &message) {
67  return proxy::ReliableTopicImpl::publish(to_data(message));
68  }
69 
87  template<typename Listener>
88  std::string add_message_listener(Listener &&listener) {
89  int id = ++runner_counter_;
90  std::shared_ptr<MessageRunner < Listener>>
91  runner(new MessageRunner<Listener>(id, std::forward<Listener>(listener), ringbuffer_, get_name(),
92  get_serialization_service(), config_, logger_));
93  runners_map_.put(id, runner);
94  runner->next();
95  return std::to_string(id);
96  }
97 
106  bool remove_message_listener(const std::string &registration_id);
107  protected:
108  void on_destroy() override;
109 
110  private:
111  reliable_topic(std::shared_ptr<ringbuffer> rb, const std::string &instance_name,
112  spi::ClientContext *context);
113 
114  template<typename Listener>
115  class MessageRunner
116  : public execution_callback<rb::read_result_set>,
117  public std::enable_shared_from_this<MessageRunner<Listener>>,
118  public util::concurrent::Cancellable {
119  public:
120  MessageRunner(int id, Listener &&listener, const std::shared_ptr<ringbuffer> &rb,
121  const std::string &topic_name, serialization::pimpl::SerializationService &service,
122  const config::reliable_topic_config &reliable_topic_config, logger &lg)
123  : listener_(listener), id_(id), ringbuffer_(rb), cancelled_(false), logger_(lg),
124  name_(topic_name), executor_(rb, lg), serialization_service_(service),
125  config_(reliable_topic_config) {
126  // we are going to listen to next publication. We don't care about what already has been published.
127  int64_t initialSequence = listener.initial_sequence_id_;
128  if (initialSequence == -1) {
129  initialSequence = ringbuffer_->tail_sequence().get() + 1;
130  }
131  sequence_ = initialSequence;
132  }
133 
134  ~MessageRunner() override = default;
135 
136  void next() {
137  if (cancelled_) {
138  return;
139  }
140 
141  topic::impl::reliable::ReliableTopicExecutor::Message m;
142  m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
143  m.callback = this->shared_from_this();
144  m.sequence = sequence_;
145  m.max_count = config_.get_read_batch_size();
146  executor_.execute(std::move(m));
147  }
148 
149  // This method is called from the provided executor.
150  void on_response(const boost::optional<rb::read_result_set> &all_messages) override {
151  if (cancelled_) {
152  return;
153  }
154 
155  // we process all messages in batch. So we don't release the thread and reschedule ourselves;
156  // but we'll process whatever was received in 1 go.
157  for (auto &item : all_messages->get_items()) {
158  try {
159  listener_.store_sequence_id_(sequence_);
160  process(item.get<topic::impl::reliable::ReliableTopicMessage>().get_ptr());
161  } catch (exception::iexception &e) {
162  if (terminate(e)) {
163  cancel();
164  return;
165  }
166  }
167 
168  sequence_++;
169  }
170 
171  next();
172  }
173 
174  // This method is called from the provided executor.
175  void on_failure(std::exception_ptr throwable) override {
176  if (cancelled_) {
177  return;
178  }
179 
180  try {
181  std::rethrow_exception(throwable);
182  } catch (exception::iexception &ie) {
183  int32_t err = ie.get_error_code();
184  int32_t causeErrorCode = protocol::UNDEFINED;
185  try {
186  std::rethrow_if_nested(ie);
187  } catch (exception::iexception &causeException) {
188  causeErrorCode = causeException.get_error_code();
189  }
190  if (protocol::TIMEOUT == err) {
191  HZ_LOG(logger_, finest,
192  boost::str(boost::format("MessageListener on topic: %1% timed out. "
193  "Continuing from last known sequence: %2%")
194  % name_ % sequence_)
195  );
196  next();
197  return;
198  } else if (protocol::EXECUTION == err &&
199  protocol::STALE_SEQUENCE == causeErrorCode) {
200  // stale_sequence_exception.getHeadSeq() is not available on the client-side, see #7317
201  int64_t remoteHeadSeq = ringbuffer_->head_sequence().get();
202 
203  if (listener_.loss_tolerant_) {
204  HZ_LOG(logger_, finest,
205  boost::str(boost::format("MessageListener %1% on topic: %2% "
206  "ran into a stale sequence. "
207  "Jumping from old sequence: %3% "
208  "to sequence: %4%")
209  % id_ % name_ % sequence_ % remoteHeadSeq)
210  );
211  sequence_ = remoteHeadSeq;
212  next();
213  return;
214  }
215 
216  HZ_LOG(logger_, warning,
217  boost::str(boost::format("Terminating MessageListener: %1% on topic: %2%"
218  "Reason: The listener was too slow or the retention "
219  "period of the message has been violated. "
220  "head: %3% sequence: %4%")
221  % id_ % name_ % remoteHeadSeq % sequence_)
222  );
223  } else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
224  HZ_LOG(logger_, finest,
225  boost::str(boost::format("Terminating MessageListener %1% on topic: %2%. "
226  "Reason: HazelcastInstance is shutting down")
227  % id_ % name_)
228  );
229  } else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
230  HZ_LOG(logger_, finest,
231  boost::str(boost::format("Terminating MessageListener %1% on topic: %2%. "
232  "Reason: Topic is destroyed")
233  % id_ % name_)
234  );
235  } else {
236  HZ_LOG(logger_, warning,
237  boost::str(boost::format("Terminating MessageListener %1% on topic: %2%. "
238  "Reason: Unhandled exception, details: %3%")
239  % id_ % name_ % ie.what())
240  );
241  }
242 
243  cancel();
244  }
245  }
246 
247  bool cancel() override {
248  cancelled_.store(true);
249  return executor_.stop();
250  }
251 
252  bool is_cancelled() override {
253  return cancelled_.load();
254  }
255  private:
256  void process(topic::impl::reliable::ReliableTopicMessage *message) {
257  // proxy.localTopicStats.incrementReceives();
258  listener_.received_(to_message(message));
259  }
260 
261  topic::message to_message(topic::impl::reliable::ReliableTopicMessage *message) {
262  boost::optional<member> m;
263  auto &addr = message->get_publisher_address();
264  if (addr.has_value()) {
265  m = boost::make_optional<member>(addr.value());
266  }
267  return topic::message(name_, typed_data(std::move(message->get_payload()), serialization_service_),
268  message->get_publish_time(), std::move(m));
269  }
270 
271  bool terminate(const exception::iexception &failure) {
272  if (cancelled_) {
273  return true;
274  }
275 
276  try {
277  bool terminate = listener_.terminal_(failure);
278  if (terminate) {
279  HZ_LOG(logger_, warning,
280  boost::str(boost::format("Terminating ReliableListener %1% "
281  "on topic: %2%. "
282  "Reason: Unhandled exception, details: %3%")
283  % id_ % name_ % failure.what())
284  );
285  } else {
286  HZ_LOG(logger_, finest,
287  boost::str(boost::format("ReliableListener %1% on topic: %2% "
288  "ran into an exception, details: %3%")
289  % id_ % name_ % failure.what())
290  );
291  }
292  return terminate;
293  } catch (exception::iexception &t) {
294  HZ_LOG(logger_, warning,
295  boost::str(boost::format("Terminating ReliableListener %1% on topic: %2%. "
296  "Reason: Unhandled exception while calling the function set by "
297  "ReliableListener::terminate_on_exception. %3%")
298  % id_ % name_ % t.what())
299  );
300  return true;
301  }
302  }
303 
304  private:
305  Listener listener_;
306  int id_;
307  std::shared_ptr<ringbuffer> ringbuffer_;
308  int64_t sequence_;
309  std::atomic<bool> cancelled_;
310  logger &logger_;
311  const std::string &name_;
312  topic::impl::reliable::ReliableTopicExecutor executor_;
313  serialization::pimpl::SerializationService &serialization_service_;
314  const config::reliable_topic_config &config_;
315  };
316 
317  util::SynchronizedMap<int, util::concurrent::Cancellable> runners_map_;
318  std::atomic<int> runner_counter_{ 0 };
319  };
320  }
321 }
322 
323 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
324 #pragma warning(pop)
325 #endif
Definition: hazelcast_client.h:62
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: reliable_topic.h:51
Definition: logger.h:26
boost::future< void > publish(const E &message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: reliable_topic.h:66
std::string add_message_listener(Listener &&listener)
Subscribes to this topic.
Definition: reliable_topic.h:88
Definition: address.h:30
execution_callback allows to asynchronously get notified when the execution is completed, either successfully or with error.
Definition: execution_callback.h:40
Definition: reliable_topic_config.h:29