Hazelcast C++ Client
Hazelcast C++ Client Library
reliable_topic.h
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <string>
19 #include <memory>
20 #include <atomic>
21 
22 #include "hazelcast/client/ringbuffer.h"
23 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
24 #include "hazelcast/util/Preconditions.h"
25 #include "hazelcast/util/concurrent/Cancellable.h"
26 #include "hazelcast/logger.h"
27 #include "hazelcast/client/topic/impl/reliable/ReliableTopicMessage.h"
28 
29 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
30 #pragma warning(push)
31 #pragma warning(disable : 4251) // for dll export
32 #endif
33 
34 namespace hazelcast {
35 namespace client {
36 
52 class HAZELCAST_API reliable_topic : public proxy::ProxyImpl
53 {
54  friend class spi::ProxyManager;
55  friend class hazelcast_client;
56 
57 public:
58  static constexpr const char* SERVICE_NAME = "hz:impl:reliableTopicService";
59 
69  template<typename E>
70  boost::future<void> publish(const E& message)
71  {
72  topic::impl::reliable::ReliableTopicMessage reliable_message(
73  to_data(message), nullptr);
74  return to_void_future(ringbuffer_.get()->add(reliable_message));
75  }
76 
95  template<typename Listener>
96  std::string add_message_listener(Listener&& listener)
97  {
98  int id = ++runner_counter_;
99  std::shared_ptr<MessageRunner<Listener>> runner(
100  new MessageRunner<Listener>(id,
101  std::forward<Listener>(listener),
102  ringbuffer_.get(),
103  get_name(),
104  get_serialization_service(),
105  batch_size_,
106  logger_,
107  executor_,
108  runners_map_));
109  runners_map_.put(id, runner);
110  runner->next();
111  return std::to_string(id);
112  }
113 
122  bool remove_message_listener(const std::string& registration_id);
123 
124 protected:
125  void on_shutdown() override;
126 
127  void on_destroy() override;
128 
129  void post_destroy() override;
130 
131 private:
132  static constexpr const char* TOPIC_RB_PREFIX = "_hz_rb_";
133 
134  reliable_topic(const std::string& instance_name,
135  spi::ClientContext* context);
136 
137  template<typename Listener>
138  class MessageRunner
139  : public std::enable_shared_from_this<MessageRunner<Listener>>
140  , public util::concurrent::Cancellable
141  {
142  public:
143  MessageRunner(int id,
144  Listener&& listener,
145  const std::shared_ptr<ringbuffer>& rb,
146  const std::string& topic_name,
147  serialization::pimpl::SerializationService& service,
148  int batch_size,
149  logger& lg,
150  util::hz_thread_pool& executor,
151  util::SynchronizedMap<int, util::concurrent::Cancellable>&
152  runners_map)
153  : listener_(listener)
154  , id_(id)
155  , ringbuffer_(rb)
156  , cancelled_(false)
157  , logger_(lg)
158  , name_(topic_name)
159  , executor_(executor)
160  , serialization_service_(service)
161  , batch_size_(batch_size)
162  , runners_map_(runners_map)
163  {
164  // we are going to listen to next publication. We don't care about
165  // what already has been published.
166  int64_t initialSequence = listener.initial_sequence_id_;
167  if (initialSequence == -1) {
168  initialSequence = ringbuffer_->tail_sequence().get() + 1;
169  }
170  sequence_ = initialSequence;
171  }
172 
173  virtual ~MessageRunner() = default;
174 
175  void next()
176  {
177  if (cancelled_) {
178  return;
179  }
180 
181  auto runner = this->shared_from_this();
182  ringbuffer_->read_many(sequence_, 1, batch_size_)
183  .then(executor_, [runner](boost::future<rb::read_result_set> f) {
184  if (runner->cancelled_) {
185  return;
186  }
187 
188  try {
189  auto result = f.get();
190 
191  // we process all messages in batch. So we don't release
192  // the thread and reschedule ourselves; but we'll process
193  // whatever was received in 1 go.
194  auto lost_count =
195  result.get_next_sequence_to_read_from() -
196  result.read_count() - runner->sequence_;
197  if (lost_count != 0 &&
198  !runner->listener_.loss_tolerant_) {
199  runner->cancel();
200  return;
201  }
202 
203  auto const& items = result.get_items();
204  for (size_t i = 0; i < items.size(); i++) {
205  auto const& message = items[i];
206  try {
207  runner->listener_.store_sequence_id_(
208  result.get_sequence(static_cast<int>(i)));
209  auto rel_msg = message.get<
210  topic::impl::reliable::ReliableTopicMessage>();
211  runner->process(*rel_msg);
212  } catch (exception::iexception& e) {
213  if (runner->terminate(e)) {
214  runner->cancel();
215  return;
216  }
217  }
218  }
219 
220  runner->sequence_ =
221  result.get_next_sequence_to_read_from();
222  runner->next();
223  } catch (exception::iexception& ie) {
224  if (runner->handle_internal_exception(ie)) {
225  runner->next();
226  } else {
227  runner->cancel();
228  }
229  }
230  });
231  }
232 
233  bool handle_operation_timeout_exception()
234  {
235  HZ_LOG(logger_,
236  finest,
237  boost::str(
238  boost::format("MessageListener on topic: %1% timed out. "
239  "Continuing from last known sequence: %2%") %
240  name_ % sequence_));
241 
242  return true;
243  }
244 
255  bool handle_illegal_argument_exception(exception::iexception& e)
256  {
257  // stale_sequence_exception.getHeadSeq() is not available on the
258  // client-side, see #7317
259  int64_t remoteHeadSeq = ringbuffer_->head_sequence().get();
260 
261  if (listener_.loss_tolerant_) {
262  HZ_LOG(logger_,
263  finest,
264  boost::str(
265  boost::format(
266  "Terminating MessageListener: %1% on topic: %2% ."
267  "Reason: Underlying ring buffer data related to "
268  "reliable topic is lost.") %
269  id_ % name_ % e.what() % sequence_ % remoteHeadSeq));
270  sequence_ = remoteHeadSeq;
271  return true;
272  }
273 
274  HZ_LOG(
275  logger_,
276  warning,
277  boost::str(boost::format(
278  "Terminating MessageListener: %1% on topic: %2%"
279  "Reason: The listener was too slow or the retention "
280  "period of the message has been violated. "
281  "head: %3% sequence: %4%") %
282  id_ % name_ % remoteHeadSeq % sequence_));
283  return false;
284  }
285 
292  bool handle_internal_exception(exception::iexception& ie)
293  {
294  int32_t err = ie.get_error_code();
295 
296  switch (err) {
297  case protocol::TIMEOUT:
298  return handle_operation_timeout_exception();
299  case protocol::ILLEGAL_ARGUMENT:
300  return handle_illegal_argument_exception(ie);
301  case protocol::HAZELCAST_INSTANCE_NOT_ACTIVE:
302  HZ_LOG(logger_,
303  finest,
304  boost::str(
305  boost::format(
306  "Terminating MessageListener %1% on topic: %2%. "
307  "Reason: HazelcastInstance is shutting down") %
308  id_ % name_));
309  case protocol::DISTRIBUTED_OBJECT_DESTROYED:
310  HZ_LOG(logger_,
311  finest,
312  boost::str(
313  boost::format(
314  "Terminating MessageListener %1% on topic: %2%. "
315  "Reason: Topic is destroyed") %
316  id_ % name_));
317  default:
318  HZ_LOG(logger_,
319  warning,
320  boost::str(
321  boost::format(
322  "Terminating MessageListener %1% on topic: %2%. "
323  "Reason: Unhandled exception, details: %3%") %
324  id_ % name_ % ie.what()));
325  }
326  return false;
327  }
328 
329  bool cancel() override
330  {
331  cancelled_.store(true);
332  runners_map_.remove(id_);
333  return true;
334  }
335 
336  bool is_cancelled() override { return cancelled_.load(); }
337 
338  private:
339  void process(topic::impl::reliable::ReliableTopicMessage& message)
340  {
341  listener_.received_(to_message(message));
342  }
343 
344  topic::message to_message(
345  topic::impl::reliable::ReliableTopicMessage& message)
346  {
347  boost::optional<member> m;
348  auto& addr = message.get_publisher_address();
349  if (addr.has_value()) {
350  m = boost::make_optional<member>(addr.value());
351  }
352  return topic::message(name_,
353  typed_data(std::move(message.get_payload()),
354  serialization_service_),
355  message.get_publish_time(),
356  std::move(m));
357  }
358 
359  bool terminate(const exception::iexception& failure)
360  {
361  if (cancelled_) {
362  return true;
363  }
364 
365  try {
366  bool terminate = listener_.terminal_(failure);
367  if (terminate) {
368  HZ_LOG(logger_,
369  warning,
370  boost::str(
371  boost::format(
372  "Terminating ReliableListener %1% "
373  "on topic: %2%. "
374  "Reason: Unhandled exception, details: %3%") %
375  id_ % name_ % failure.what()));
376  } else {
377  HZ_LOG(logger_,
378  finest,
379  boost::str(boost::format(
380  "ReliableListener %1% on topic: %2% "
381  "ran into an exception, details: %3%") %
382  id_ % name_ % failure.what()));
383  }
384  return terminate;
385  } catch (exception::iexception& t) {
386  HZ_LOG(logger_,
387  warning,
388  boost::str(
389  boost::format(
390  "Terminating ReliableListener %1% on topic: %2%. "
391  "Reason: Unhandled exception while calling the "
392  "function set by "
393  "ReliableListener::terminate_on_exception. %3%") %
394  id_ % name_ % t.what()));
395  return true;
396  }
397  }
398 
399  private:
400  Listener listener_;
401  int id_;
402  std::shared_ptr<ringbuffer> ringbuffer_;
403  int64_t sequence_;
404  std::atomic<bool> cancelled_;
405  logger& logger_;
406  const std::string& name_;
407  util::hz_thread_pool& executor_;
408  serialization::pimpl::SerializationService& serialization_service_;
409  int batch_size_;
410  util::SynchronizedMap<int, util::concurrent::Cancellable>& runners_map_;
411  };
412 
413  util::SynchronizedMap<int, util::concurrent::Cancellable> runners_map_;
414  std::atomic<int> runner_counter_{ 0 };
415  util::hz_thread_pool& executor_;
416  logger& logger_;
417  int batch_size_;
418  boost::shared_future<std::shared_ptr<ringbuffer>> ringbuffer_;
419 };
420 } // namespace client
421 } // namespace hazelcast
422 
423 #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
424 #pragma warning(pop)
425 #endif
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
std::string add_message_listener(Listener &&listener)
Subscribes to this topic.
boost::future< void > publish(const E &message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...