Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
cp_impl.cpp
1/*
2 * Copyright (c) 2008-2025, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <boost/thread/shared_lock_guard.hpp>
18#include <boost/container_hash/hash.hpp>
19
20#include "hazelcast/cp/cp_impl.h"
21#include "hazelcast/cp/cp.h"
22#include "hazelcast/client/exception/protocol_exceptions.h"
23#include "hazelcast/client/protocol/codec/codecs.h"
24#include "hazelcast/client/spi/ClientContext.h"
25#include "hazelcast/client/spi/impl/ClientInvocation.h"
26#include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
27#include "hazelcast/client/proxy/SerializingProxy.h"
28
29namespace hazelcast {
30namespace cp {
31namespace internal {
32namespace session {
33constexpr int64_t proxy_session_manager::NO_SESSION_ID;
34constexpr int64_t proxy_session_manager::SHUTDOWN_TIMEOUT_SECONDS;
35proxy_session_manager::proxy_session_manager(
36 hazelcast::client::spi::ClientContext& client)
37 : client_(client)
38{}
39
40int64_t
41proxy_session_manager::acquire_session(const raft_group_id& group_id)
42{
43 return get_or_create_session(group_id).acquire(1);
44}
45
46int64_t
47proxy_session_manager::acquire_session(const raft_group_id& group_id,
48 int32_t count)
49{
50 return get_or_create_session(group_id).acquire(count);
51}
52
53proxy_session_manager::session_state&
54proxy_session_manager::get_or_create_session(const raft_group_id& group_id)
55{
56 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
57
58 if (!running_) {
59 BOOST_THROW_EXCEPTION(client::exception::hazelcast_instance_not_active(
60 "proxy_session_manager::get_or_create_session",
61 "Session manager is already shut down!"));
62 }
63
64 auto session = sessions_.find(group_id);
65 if (session == sessions_.end() || !session->second.is_valid()) {
66 // upgrade to write lock
67 boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(
68 read_lock);
69 session = sessions_.find(group_id);
70 if (session == sessions_.end() || !session->second.is_valid()) {
71 session = create_new_session(group_id);
72 }
73 }
74 return session->second;
75}
76
77std::unordered_map<raft_group_id,
78 proxy_session_manager::session_state>::iterator
79proxy_session_manager::create_new_session(const raft_group_id& group_id)
80{
81 // the lock_ is already acquired as write lock
82 auto response = request_new_session(group_id);
83 session_state state { response.id , response.ttl_millis };
84
85 auto result = sessions_.emplace( group_id, state );
86
87 if(!result.second)
88 result.first->second = state;
89
90 schedule_heartbeat_task(response.heartbeat_millis);
91 return result.first;
92}
93
94proxy_session_manager::session_response
95proxy_session_manager::request_new_session(const raft_group_id& group_id)
96{
97 auto request = client::protocol::codec::cpsession_createsession_encode(
98 group_id, client_.get_name());
99 auto response = client::spi::impl::ClientInvocation::create(
100 client_, request, "sessionManager")
101 ->invoke()
102 .get();
103 auto session_id = response.get_first_fixed_sized_field<int64_t>();
104 auto ttl_millis = response.get<int64_t>();
105 auto hearbeat_millis = response.get<int64_t>();
106 return { session_id, ttl_millis, hearbeat_millis };
107}
108
109void
110proxy_session_manager::schedule_heartbeat_task(int64_t hearbeat_millis)
111{
112 bool current = false;
113 if (scheduled_heartbeat_.compare_exchange_strong(current, true)) {
114 auto prev_heartbeats =
115 std::make_shared<std::vector<boost::future<void>>>();
116 auto duration = std::chrono::milliseconds(hearbeat_millis);
117 heartbeat_timer_ =
118 client_.get_client_execution_service().schedule_with_repetition(
119 [=]() {
120 // we can not cancel a future
121 prev_heartbeats->clear();
122 std::vector<std::tuple<raft_group_id, int64_t, bool>> sessions;
123 {
124 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
125 for (const auto& s : sessions_) {
126 sessions.emplace_back(
127 s.first, s.second.id, s.second.is_in_use());
128 }
129 }
130 for (const auto& entry : sessions) {
131 raft_group_id group_id;
132 int64_t session_id;
133 bool in_use;
134 std::tie(group_id, session_id, in_use) = entry;
135 if (in_use) {
136 prev_heartbeats->emplace_back(
137 heartbeat(group_id, session_id)
138 .then(
139 boost::launch::sync,
140 [=](boost::future<client::protocol::ClientMessage>
141 f) {
142 try {
143 f.get();
144 } catch (
145 client::exception::session_expired&) {
146 invalidate_session(group_id, session_id);
147 } catch (
148 client::exception::cp_group_destroyed&) {
149 invalidate_session(group_id, session_id);
150 }
151 }));
152 }
153 }
154 },
155 duration,
156 duration);
157 }
158}
159
160boost::future<client::protocol::ClientMessage>
161proxy_session_manager::heartbeat(const raft_group_id& group_id,
162 int64_t session_id)
163{
164 auto request = client::protocol::codec::cpsession_heartbeatsession_encode(
165 group_id, session_id);
166 return client::spi::impl::ClientInvocation::create(
167 client_, request, "sessionManager")
168 ->invoke();
169}
170
171void
172proxy_session_manager::invalidate_session(const raft_group_id& group_id,
173 int64_t /* session_id */)
174{
175 {
176 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
177 auto session = sessions_.find(group_id);
178 if (session != sessions_.end()) {
179 // upgrade to write lock
180 boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(
181 read_lock);
182 sessions_.erase(session);
183 }
184 }
185}
186
187void
188proxy_session_manager::release_session(const raft_group_id& group_id,
189 int64_t session_id)
190{
191 release_session(group_id, session_id, 1);
192}
193
194void
195proxy_session_manager::release_session(const raft_group_id& group_id,
196 int64_t session_id,
197 int32_t count)
198{
199 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
200 auto session = sessions_.find(group_id);
201 if (session != sessions_.end() && session->second.id == session_id) {
202 session->second.release(count);
203 }
204}
205
206int64_t
207proxy_session_manager::get_session(const raft_group_id& group_id)
208{
209 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
210 auto session = sessions_.find(group_id);
211 return session == sessions_.end() ? NO_SESSION_ID : session->second.id;
212}
213
214int64_t
215proxy_session_manager::get_or_create_unique_thread_id(
216 const raft_group_id& group_id)
217{
218 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
219 auto key = std::make_pair(group_id, util::get_current_thread_id());
220 auto global_thread_id_it = thread_ids_.find(key);
221 if (global_thread_id_it != thread_ids_.end()) {
222 return global_thread_id_it->second;
223 }
224
225 auto global_thread_id = generate_thread_id(group_id);
226
227 // upgrade to write lock
228 boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock);
229 global_thread_id_it = thread_ids_.find(key);
230 if (global_thread_id_it != thread_ids_.end()) {
231 return global_thread_id_it->second;
232 }
233
234 thread_ids_.emplace(std::move(key), global_thread_id);
235 return global_thread_id;
236}
237
238void
239proxy_session_manager::shutdown()
240{
241 boost::unique_lock<boost::shared_mutex> write_lock(lock_);
242 if (scheduled_heartbeat_ && heartbeat_timer_) {
243 heartbeat_timer_->cancel();
244 }
245
246 std::vector<boost::future<client::protocol::ClientMessage>> invocations;
247 for (const auto& s : sessions_) {
248 invocations.emplace_back(close_session(s.first, s.second.id));
249 }
250
251 auto waiting_future =
252 boost::when_all(invocations.begin(), invocations.end());
253
254 waiting_future.wait_for(boost::chrono::seconds(SHUTDOWN_TIMEOUT_SECONDS));
255
256 sessions_.clear();
257 running_ = false;
258}
259
260boost::future<client::protocol::ClientMessage>
261proxy_session_manager::close_session(const raft_group_id& group_id,
262 int64_t session_id)
263{
264 auto request = client::protocol::codec::cpsession_closesession_encode(
265 group_id, session_id);
266 return client::spi::impl::ClientInvocation::create(
267 client_, request, "sessionManager")
268 ->invoke();
269}
270
271int64_t
272proxy_session_manager::get_session_acquire_count(const raft_group_id& group_id,
273 int64_t session_id)
274{
275 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
276 auto session = sessions_.find(group_id);
277 return session != sessions_.end() && session->second.id == session_id
278 ? session->second.acquire_count.load()
279 : 0;
280}
281
282int64_t
283proxy_session_manager::generate_thread_id(const raft_group_id& group_id)
284{
285 auto request =
286 client::protocol::codec::cpsession_generatethreadid_encode(group_id);
287 return client::spi::impl::ClientInvocation::create(
288 client_, request, "sessionManager")
289 ->invoke()
290 .get()
291 .get_first_fixed_sized_field<int64_t>();
292}
293
294bool
295proxy_session_manager::session_state::is_valid() const
296{
297 return is_in_use() || !is_expired();
298}
299
300bool
301proxy_session_manager::session_state::is_in_use() const
302{
303 return acquire_count.load() > 0;
304}
305
306bool
307proxy_session_manager::session_state::is_expired() const
308{
309 auto expirationTime = creation_time + ttl;
310 if (expirationTime.time_since_epoch().count() < 0) {
311 expirationTime = (std::chrono::steady_clock::time_point::max)();
312 }
313 return std::chrono::steady_clock::now() > expirationTime;
314}
315
316proxy_session_manager::session_state::session_state(int64_t id,
317 int64_t ttl_millis)
318 : id(id)
319 , ttl(ttl_millis)
320 , creation_time(std::chrono::steady_clock::now())
321{}
322
323proxy_session_manager::session_state::session_state(const session_state& rhs)
324 : id(rhs.id)
325 , ttl(rhs.ttl)
326 , creation_time(rhs.creation_time)
327 , acquire_count(rhs.acquire_count.load())
328{}
329
330proxy_session_manager::session_state&
331proxy_session_manager::session_state::operator=(const session_state& rhs)
332{
333 id = rhs.id;
334 ttl = rhs.ttl;
335 creation_time = rhs.creation_time;
336 acquire_count = rhs.acquire_count.load();
337
338 return *this;
339}
340
341int64_t
342proxy_session_manager::session_state::acquire(int32_t count)
343{
344 acquire_count.fetch_add(count);
345 return id;
346}
347
348void
349proxy_session_manager::session_state::release(int32_t count)
350{
351 acquire_count.fetch_sub(count);
352}
353
354} // namespace session
355} // namespace internal
356} // namespace cp
357} // namespace hazelcast
STL namespace.