Hazelcast C++ Client
Hazelcast C++ Client Library
reliable_topic.h
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <string>
19 #include <memory>
20 #include <atomic>
21 
22 #include "hazelcast/client/ringbuffer.h"
23 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
24 #include "hazelcast/util/Preconditions.h"
25 #include "hazelcast/util/concurrent/Cancellable.h"
26 #include "hazelcast/logger.h"
27 #include "hazelcast/client/topic/impl/reliable/ReliableTopicMessage.h"
28 
29 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
30 #pragma warning(push)
31 #pragma warning(disable: 4251) //for dll export
32 #endif
33 
34 namespace hazelcast {
35  namespace client {
36 
49  class HAZELCAST_API reliable_topic : public proxy::ProxyImpl {
50  friend class spi::ProxyManager;
51  friend class hazelcast_client;
52  public:
53  static constexpr const char *SERVICE_NAME = "hz:impl:reliableTopicService";
54 
63  template<typename E>
64  boost::future<void> publish(const E &message) {
65  topic::impl::reliable::ReliableTopicMessage reliable_message(to_data(message), nullptr);
66  return to_void_future(ringbuffer_.get()->add(reliable_message));
67  }
68 
86  template<typename Listener>
87  std::string add_message_listener(Listener &&listener) {
88  int id = ++runner_counter_;
89  std::shared_ptr<MessageRunner < Listener>>
90  runner(new MessageRunner<Listener>(id, std::forward<Listener>(listener), ringbuffer_.get(), get_name(),
91  get_serialization_service(), batch_size_, logger_, executor_,
92  runners_map_));
93  runners_map_.put(id, runner);
94  runner->next();
95  return std::to_string(id);
96  }
97 
106  bool remove_message_listener(const std::string &registration_id);
107  protected:
108  void on_shutdown() override;
109 
110  void on_destroy() override;
111 
112  void post_destroy() override;
113 
114  private:
115  static constexpr const char *TOPIC_RB_PREFIX = "_hz_rb_";
116 
117  reliable_topic(const std::string &instance_name, spi::ClientContext *context);
118 
119  template<typename Listener>
120  class MessageRunner
121  : public std::enable_shared_from_this<MessageRunner<Listener>>,
122  public util::concurrent::Cancellable {
123  public:
124  MessageRunner(int id, Listener &&listener, const std::shared_ptr<ringbuffer> &rb,
125  const std::string &topic_name, serialization::pimpl::SerializationService &service,
126  int batch_size, logger &lg, util::hz_thread_pool &executor,
127  util::SynchronizedMap<int, util::concurrent::Cancellable> &runners_map)
128  : listener_(listener), id_(id), ringbuffer_(rb), cancelled_(false), logger_(lg),
129  name_(topic_name), executor_(executor), serialization_service_(service),
130  batch_size_(batch_size), runners_map_(runners_map) {
131  // we are going to listen to next publication. We don't care about what already has been published.
132  int64_t initialSequence = listener.initial_sequence_id_;
133  if (initialSequence == -1) {
134  initialSequence = ringbuffer_->tail_sequence().get() + 1;
135  }
136  sequence_ = initialSequence;
137  }
138 
139  virtual ~MessageRunner() = default;
140 
141  void next() {
142  if (cancelled_) {
143  return;
144  }
145 
146  auto runner = this->shared_from_this();
147  ringbuffer_->read_many(sequence_, 1, batch_size_).then(executor_, [runner](
148  boost::future<rb::read_result_set> f) {
149  if (runner->cancelled_) {
150  return;
151  }
152 
153  try {
154  auto result = f.get();
155 
156  // we process all messages in batch. So we don't release the thread and reschedule ourselves;
157  // but we'll process whatever was received in 1 go.
158  auto lost_count = result.get_next_sequence_to_read_from() - result.read_count() - runner->sequence_;
159  if (lost_count != 0 && !runner->listener_.loss_tolerant_) {
160  runner->cancel();
161  return;
162  }
163 
164  auto const &items = result.get_items();
165  for (size_t i = 0; i < items.size(); i++) {
166  auto const &message = items[i];
167  try {
168  runner->listener_.store_sequence_id_(result.get_sequence(static_cast<int>(i)));
169  auto rel_msg = message.get<topic::impl::reliable::ReliableTopicMessage>();
170  runner->process(*rel_msg);
171  } catch (exception::iexception &e) {
172  if (runner->terminate(e)) {
173  runner->cancel();
174  return;
175  }
176  }
177  }
178 
179  runner->sequence_ = result.get_next_sequence_to_read_from();
180  runner->next();
181  } catch (exception::iexception &ie) {
182  if (runner->handle_internal_exception(ie)) {
183  runner->next();
184  } else {
185  runner->cancel();
186  }
187  }
188 
189  });
190  }
191 
192  bool handle_operation_timeout_exception() {
193  HZ_LOG(logger_, finest,
194  boost::str(boost::format("MessageListener on topic: %1% timed out. "
195  "Continuing from last known sequence: %2%")
196  % name_ % sequence_)
197  );
198 
199  return true;
200  }
201 
211  bool handle_illegal_argument_exception(exception::iexception &e) {
212  // stale_sequence_exception.getHeadSeq() is not available on the client-side, see #7317
213  int64_t remoteHeadSeq = ringbuffer_->head_sequence().get();
214 
215  if (listener_.loss_tolerant_) {
216  HZ_LOG(logger_, finest,
217  boost::str(boost::format("Terminating MessageListener: %1% on topic: %2% ."
218  "Reason: Underlying ring buffer data related to reliable topic is lost.")
219  % id_ % name_ %e.what() % sequence_ % remoteHeadSeq)
220  );
221  sequence_ = remoteHeadSeq;
222  return true;
223  }
224 
225  HZ_LOG(logger_, warning,
226  boost::str(boost::format("Terminating MessageListener: %1% on topic: %2%"
227  "Reason: The listener was too slow or the retention "
228  "period of the message has been violated. "
229  "head: %3% sequence: %4%")
230  % id_ % name_ % remoteHeadSeq % sequence_)
231  );
232  return false;
233  }
234 
239  bool handle_internal_exception(exception::iexception &ie) {
240  int32_t err = ie.get_error_code();
241 
242  switch(err) {
243  case protocol::TIMEOUT:
244  return handle_operation_timeout_exception();
245  case protocol::ILLEGAL_ARGUMENT:
246  return handle_illegal_argument_exception(ie);
247  case protocol::HAZELCAST_INSTANCE_NOT_ACTIVE:
248  HZ_LOG(logger_, finest,
249  boost::str(boost::format("Terminating MessageListener %1% on topic: %2%. "
250  "Reason: HazelcastInstance is shutting down")
251  % id_ % name_)
252  );
253  case protocol::DISTRIBUTED_OBJECT_DESTROYED:
254  HZ_LOG(logger_, finest,
255  boost::str(boost::format("Terminating MessageListener %1% on topic: %2%. "
256  "Reason: Topic is destroyed")
257  % id_ % name_)
258  );
259  default:
260  HZ_LOG(logger_, warning,
261  boost::str(boost::format("Terminating MessageListener %1% on topic: %2%. "
262  "Reason: Unhandled exception, details: %3%")
263  % id_ % name_ % ie.what())
264  );
265  }
266  return false;
267  }
268 
269  bool cancel() override {
270  cancelled_.store(true);
271  runners_map_.remove(id_);
272  return true;
273  }
274 
275  bool is_cancelled() override {
276  return cancelled_.load();
277  }
278  private:
279  void process(topic::impl::reliable::ReliableTopicMessage &message) {
280  listener_.received_(to_message(message));
281  }
282 
283  topic::message to_message(topic::impl::reliable::ReliableTopicMessage &message) {
284  boost::optional<member> m;
285  auto &addr = message.get_publisher_address();
286  if (addr.has_value()) {
287  m = boost::make_optional<member>(addr.value());
288  }
289  return topic::message(name_, typed_data(std::move(message.get_payload()), serialization_service_),
290  message.get_publish_time(), std::move(m));
291  }
292 
293  bool terminate(const exception::iexception &failure) {
294  if (cancelled_) {
295  return true;
296  }
297 
298  try {
299  bool terminate = listener_.terminal_(failure);
300  if (terminate) {
301  HZ_LOG(logger_, warning,
302  boost::str(boost::format("Terminating ReliableListener %1% "
303  "on topic: %2%. "
304  "Reason: Unhandled exception, details: %3%")
305  % id_ % name_ % failure.what())
306  );
307  } else {
308  HZ_LOG(logger_, finest,
309  boost::str(boost::format("ReliableListener %1% on topic: %2% "
310  "ran into an exception, details: %3%")
311  % id_ % name_ % failure.what())
312  );
313  }
314  return terminate;
315  } catch (exception::iexception &t) {
316  HZ_LOG(logger_, warning,
317  boost::str(boost::format("Terminating ReliableListener %1% on topic: %2%. "
318  "Reason: Unhandled exception while calling the function set by "
319  "ReliableListener::terminate_on_exception. %3%")
320  % id_ % name_ % t.what())
321  );
322  return true;
323  }
324  }
325 
326  private:
327  Listener listener_;
328  int id_;
329  std::shared_ptr<ringbuffer> ringbuffer_;
330  int64_t sequence_;
331  std::atomic<bool> cancelled_;
332  logger &logger_;
333  const std::string &name_;
334  util::hz_thread_pool &executor_;
335  serialization::pimpl::SerializationService &serialization_service_;
336  int batch_size_;
337  util::SynchronizedMap<int, util::concurrent::Cancellable> &runners_map_;
338  };
339 
340  util::SynchronizedMap<int, util::concurrent::Cancellable> runners_map_;
341  std::atomic<int> runner_counter_{ 0 };
342  util::hz_thread_pool &executor_;
343  logger &logger_;
344  int batch_size_;
345  boost::shared_future<std::shared_ptr<ringbuffer>> ringbuffer_;
346  };
347  }
348 }
349 
350 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
351 #pragma warning(pop)
352 #endif
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
std::string add_message_listener(Listener &&listener)
Subscribes to this topic.
boost::future< void > publish(const E &message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...