Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
reliable_topic.h
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#pragma once
17
18#include <string>
19#include <memory>
20#include <atomic>
21
22#include "hazelcast/client/ringbuffer.h"
23#include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
24#include "hazelcast/util/Preconditions.h"
25#include "hazelcast/util/concurrent/Cancellable.h"
26#include "hazelcast/logger.h"
27#include "hazelcast/client/topic/impl/reliable/ReliableTopicMessage.h"
28
29#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
30#pragma warning(push)
31#pragma warning(disable : 4251) // for dll export
32#endif
33
34namespace hazelcast {
35namespace client {
36
52class HAZELCAST_API reliable_topic
53 : public proxy::ProxyImpl
54 , public std::enable_shared_from_this<reliable_topic>
55{
56 friend class spi::ProxyManager;
57 friend class hazelcast_client;
58
59public:
60 static constexpr const char* SERVICE_NAME = "hz:impl:reliableTopicService";
61
71 template<typename E>
72 boost::future<void> publish(const E& message)
73 {
74 topic::impl::reliable::ReliableTopicMessage reliable_message(
75 to_data(message), nullptr);
76 return to_void_future(ringbuffer_.get()->add(reliable_message));
77 }
78
97 template<typename Listener>
98 std::string add_message_listener(Listener&& listener)
99 {
100 int id = ++runner_counter_;
101 std::shared_ptr<MessageRunner<Listener>> runner(
102 new MessageRunner<Listener>(id,
103 std::forward<Listener>(listener),
104 ringbuffer_.get(),
105 get_name(),
106 get_serialization_service(),
107 batch_size_,
108 logger_,
109 execution_service_,
110 executor_,
111 shared_from_this()));
112 runners_map_.put(id, runner);
113 runner->next();
114 return std::to_string(id);
115 }
116
125 bool remove_message_listener(const std::string& registration_id);
126
127protected:
128 void on_shutdown() override;
129
130 void on_destroy() override;
131
132 void post_destroy() override;
133
134private:
135 static constexpr const char* TOPIC_RB_PREFIX = "_hz_rb_";
136
137 reliable_topic(const std::string& instance_name,
138 spi::ClientContext* context);
139
140 template<typename Listener>
141 class MessageRunner
142 : public std::enable_shared_from_this<MessageRunner<Listener>>
143 , public util::concurrent::Cancellable
144 {
145 public:
146 MessageRunner(int id,
147 Listener&& listener,
148 const std::shared_ptr<ringbuffer>& rb,
149 const std::string& topic_name,
150 serialization::pimpl::SerializationService& service,
151 int batch_size,
152 logger& lg,
153 std::shared_ptr<spi::impl::ClientExecutionServiceImpl>
154 execution_service,
155 util::hz_thread_pool& executor,
156 std::weak_ptr<reliable_topic> topic)
157 : listener_(listener)
158 , id_(id)
159 , ringbuffer_(rb)
160 , cancelled_(false)
161 , logger_(lg)
162 , name_(topic_name)
163 , execution_service_(std::move(execution_service))
164 , executor_(executor)
165 , serialization_service_(service)
166 , batch_size_(batch_size)
167 , topic_(std::move(topic))
168 {
169 // we are going to listen to next publication. We don't care about
170 // what already has been published.
171 int64_t initialSequence = listener.initial_sequence_id_;
172 if (initialSequence == -1) {
173 initialSequence = ringbuffer_->tail_sequence().get() + 1;
174 }
175 sequence_ = initialSequence;
176 }
177
178 virtual ~MessageRunner() = default;
179
180 void next()
181 {
182 if (cancelled_) {
183 return;
184 }
185
186 auto runner = this->shared_from_this();
187 auto execution_service = this->execution_service_;
188 ringbuffer_->read_many(sequence_, 1, batch_size_)
189 .then(executor_, [runner,execution_service](boost::future<rb::read_result_set> f) {
190 if (runner->cancelled_) {
191 return;
192 }
193
194 try {
195 auto result = f.get();
196
197 // we process all messages in batch. So we don't release
198 // the thread and reschedule ourselves; but we'll process
199 // whatever was received in 1 go.
200 auto lost_count =
201 result.get_next_sequence_to_read_from() -
202 result.read_count() - runner->sequence_;
203 if (lost_count != 0 &&
204 !runner->listener_.loss_tolerant_) {
205 runner->cancel();
206 return;
207 }
208
209 auto const& items = result.get_items();
210 for (size_t i = 0; i < items.size(); i++) {
211 auto const& message = items[i];
212 try {
213 runner->listener_.store_sequence_id_(
214 result.get_sequence(static_cast<int>(i)));
215 auto rel_msg = message.get<
216 topic::impl::reliable::ReliableTopicMessage>();
217 runner->process(*rel_msg);
218 } catch (exception::iexception& e) {
219 if (runner->terminate(e)) {
220 runner->cancel();
221 return;
222 }
223 }
224 }
225
226 runner->sequence_ =
227 result.get_next_sequence_to_read_from();
228 runner->next();
229 } catch (exception::iexception& ie) {
230 if (runner->handle_internal_exception(ie)) {
231 runner->next();
232 } else {
233 runner->cancel();
234 }
235 }
236 });
237 }
238
239 bool handle_operation_timeout_exception()
240 {
241 HZ_LOG(logger_,
242 finest,
243 boost::str(
244 boost::format("MessageListener on topic: %1% timed out. "
245 "Continuing from last known sequence: %2%") %
246 name_ % sequence_));
247
248 return true;
249 }
250
261 bool handle_illegal_argument_exception(exception::iexception& e)
262 {
263 // stale_sequence_exception.getHeadSeq() is not available on the
264 // client-side, see #7317
265 int64_t remoteHeadSeq = ringbuffer_->head_sequence().get();
266
267 if (listener_.loss_tolerant_) {
268 HZ_LOG(logger_,
269 finest,
270 boost::str(
271 boost::format(
272 "Terminating MessageListener: %1% on topic: %2% ."
273 "Reason: Underlying ring buffer data related to "
274 "reliable topic is lost.") %
275 id_ % name_ % e.what() % sequence_ % remoteHeadSeq));
276 sequence_ = remoteHeadSeq;
277 return true;
278 }
279
280 HZ_LOG(
281 logger_,
282 warning,
283 boost::str(boost::format(
284 "Terminating MessageListener: %1% on topic: %2%"
285 "Reason: The listener was too slow or the retention "
286 "period of the message has been violated. "
287 "head: %3% sequence: %4%") %
288 id_ % name_ % remoteHeadSeq % sequence_));
289 return false;
290 }
291
298 bool handle_internal_exception(exception::iexception& ie)
299 {
300 int32_t err = ie.get_error_code();
301
302 switch (err) {
303 case protocol::TIMEOUT:
304 return handle_operation_timeout_exception();
305 case protocol::ILLEGAL_ARGUMENT:
306 return handle_illegal_argument_exception(ie);
307 case protocol::HAZELCAST_INSTANCE_NOT_ACTIVE:
308 HZ_LOG(logger_,
309 finest,
310 boost::str(
311 boost::format(
312 "Terminating MessageListener %1% on topic: %2%. "
313 "Reason: HazelcastInstance is shutting down") %
314 id_ % name_));
315 break;
316 case protocol::DISTRIBUTED_OBJECT_DESTROYED:
317 HZ_LOG(logger_,
318 finest,
319 boost::str(
320 boost::format(
321 "Terminating MessageListener %1% on topic: %2%. "
322 "Reason: Topic is destroyed") %
323 id_ % name_));
324 break;
325 default:
326 HZ_LOG(logger_,
327 warning,
328 boost::str(
329 boost::format(
330 "Terminating MessageListener %1% on topic: %2%. "
331 "Reason: Unhandled exception, details: %3%") %
332 id_ % name_ % ie.what()));
333 }
334 return false;
335 }
336
337 bool cancel() override
338 {
339 cancelled_.store(true);
340 auto topic_ptr = topic_.lock();
341 if (topic_ptr) {
342 topic_ptr->runners_map_.remove(id_);
343 }
344 listener_.on_cancel_();
345 return true;
346 }
347
348 bool is_cancelled() override { return cancelled_.load(); }
349
350 private:
351 void process(topic::impl::reliable::ReliableTopicMessage& message)
352 {
353 listener_.received_(to_message(message));
354 }
355
356 topic::message to_message(
357 topic::impl::reliable::ReliableTopicMessage& message)
358 {
359 boost::optional<member> m;
360 auto& addr = message.get_publisher_address();
361 if (addr.has_value()) {
362 m = boost::make_optional<member>(addr.value());
363 }
364 return topic::message(name_,
365 typed_data(std::move(message.get_payload()),
366 serialization_service_),
367 message.get_publish_time(),
368 std::move(m));
369 }
370
371 bool terminate(const exception::iexception& failure)
372 {
373 if (cancelled_) {
374 return true;
375 }
376
377 try {
378 bool terminate = listener_.terminal_(failure);
379 if (terminate) {
380 HZ_LOG(logger_,
381 warning,
382 boost::str(
383 boost::format(
384 "Terminating ReliableListener %1% "
385 "on topic: %2%. "
386 "Reason: Unhandled exception, details: %3%") %
387 id_ % name_ % failure.what()));
388 } else {
389 HZ_LOG(logger_,
390 finest,
391 boost::str(boost::format(
392 "ReliableListener %1% on topic: %2% "
393 "ran into an exception, details: %3%") %
394 id_ % name_ % failure.what()));
395 }
396 return terminate;
397 } catch (exception::iexception& t) {
398 HZ_LOG(logger_,
399 warning,
400 boost::str(
401 boost::format(
402 "Terminating ReliableListener %1% on topic: %2%. "
403 "Reason: Unhandled exception while calling the "
404 "function set by "
405 "ReliableListener::terminate_on_exception. %3%") %
406 id_ % name_ % t.what()));
407 return true;
408 }
409 }
410
411 private:
412 Listener listener_;
413 int id_;
414 std::shared_ptr<ringbuffer> ringbuffer_;
415 int64_t sequence_;
416 std::atomic<bool> cancelled_;
417 logger& logger_;
418 const std::string& name_;
419 std::shared_ptr<spi::impl::ClientExecutionServiceImpl>
420 execution_service_;
421 util::hz_thread_pool& executor_;
422 serialization::pimpl::SerializationService& serialization_service_;
423 int batch_size_;
424 std::weak_ptr<reliable_topic> topic_;
425 };
426
427 util::SynchronizedMap<int, util::concurrent::Cancellable> runners_map_;
428 std::atomic<int> runner_counter_{ 0 };
429 std::shared_ptr<spi::impl::ClientExecutionServiceImpl> execution_service_;
430 util::hz_thread_pool& executor_;
431 logger& logger_;
432 int batch_size_;
433 boost::shared_future<std::shared_ptr<ringbuffer>> ringbuffer_;
434};
435} // namespace client
436} // namespace hazelcast
437
438#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
439#pragma warning(pop)
440#endif
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
boost::future< void > publish(const E &message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
std::string add_message_listener(Listener &&listener)
Subscribes to this topic.
STL namespace.