Hazelcast C++ Client
Hazelcast C++ Client Library
cp_impl.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <boost/thread/shared_lock_guard.hpp>
18 #include <boost/container_hash/hash.hpp>
19 
20 #include "hazelcast/cp/cp_impl.h"
21 #include "hazelcast/cp/cp.h"
22 #include "hazelcast/client/exception/protocol_exceptions.h"
23 #include "hazelcast/client/protocol/codec/codecs.h"
24 #include "hazelcast/client/spi/ClientContext.h"
25 #include "hazelcast/client/spi/impl/ClientInvocation.h"
26 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
27 #include "hazelcast/client/proxy/SerializingProxy.h"
28 
29 namespace hazelcast {
30 namespace cp {
31 namespace internal {
32 namespace session {
33 constexpr int64_t proxy_session_manager::NO_SESSION_ID;
34 constexpr int64_t proxy_session_manager::SHUTDOWN_TIMEOUT_SECONDS;
35 proxy_session_manager::proxy_session_manager(
36  hazelcast::client::spi::ClientContext& client)
37  : client_(client)
38 {}
39 
40 int64_t
41 proxy_session_manager::acquire_session(const raft_group_id& group_id)
42 {
43  return get_or_create_session(group_id).acquire(1);
44 }
45 
46 int64_t
47 proxy_session_manager::acquire_session(const raft_group_id& group_id,
48  int32_t count)
49 {
50  return get_or_create_session(group_id).acquire(count);
51 }
52 
53 proxy_session_manager::session_state&
54 proxy_session_manager::get_or_create_session(const raft_group_id& group_id)
55 {
56  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
57 
58  if (!running_) {
59  BOOST_THROW_EXCEPTION(client::exception::hazelcast_instance_not_active(
60  "proxy_session_manager::get_or_create_session",
61  "Session manager is already shut down!"));
62  }
63 
64  auto session = sessions_.find(group_id);
65  if (session == sessions_.end() || !session->second.is_valid()) {
66  // upgrade to write lock
67  boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(
68  read_lock);
69  session = sessions_.find(group_id);
70  if (session == sessions_.end() || !session->second.is_valid()) {
71  session = create_new_session(group_id);
72  }
73  }
74  return session->second;
75 }
76 
77 std::unordered_map<raft_group_id,
78  proxy_session_manager::session_state>::iterator
79 proxy_session_manager::create_new_session(const raft_group_id& group_id)
80 {
81  // the lock_ is already acquired as write lock
82  auto response = request_new_session(group_id);
83  session_state state { response.id , response.ttl_millis };
84 
85  auto result = sessions_.emplace( group_id, state );
86 
87  if(!result.second)
88  result.first->second = state;
89 
90  schedule_heartbeat_task(response.heartbeat_millis);
91  return result.first;
92 }
93 
94 proxy_session_manager::session_response
95 proxy_session_manager::request_new_session(const raft_group_id& group_id)
96 {
97  auto request = client::protocol::codec::cpsession_createsession_encode(
98  group_id, client_.get_name());
99  auto response = client::spi::impl::ClientInvocation::create(
100  client_, request, "sessionManager")
101  ->invoke()
102  .get();
103  auto session_id = response.get_first_fixed_sized_field<int64_t>();
104  auto ttl_millis = response.get<int64_t>();
105  auto hearbeat_millis = response.get<int64_t>();
106  return { session_id, ttl_millis, hearbeat_millis };
107 }
108 
109 void
110 proxy_session_manager::schedule_heartbeat_task(int64_t hearbeat_millis)
111 {
112  bool current = false;
113  if (scheduled_heartbeat_.compare_exchange_strong(current, true)) {
114  auto prev_heartbeats =
115  std::make_shared<std::vector<boost::future<void>>>();
116  auto duration = std::chrono::milliseconds(hearbeat_millis);
117  heartbeat_timer_ =
118  client_.get_client_execution_service().schedule_with_repetition(
119  [=]() {
120  // we can not cancel a future
121  prev_heartbeats->clear();
122  std::vector<std::tuple<raft_group_id, int64_t, bool>> sessions;
123  {
124  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
125  for (const auto& s : sessions_) {
126  sessions.emplace_back(
127  s.first, s.second.id, s.second.is_in_use());
128  }
129  }
130  for (const auto& entry : sessions) {
131  raft_group_id group_id;
132  int64_t session_id;
133  bool in_use;
134  std::tie(group_id, session_id, in_use) = entry;
135  if (in_use) {
136  prev_heartbeats->emplace_back(
137  heartbeat(group_id, session_id)
138  .then(
139  boost::launch::sync,
140  [=](boost::future<client::protocol::ClientMessage>
141  f) {
142  try {
143  f.get();
144  } catch (
145  client::exception::session_expired&) {
146  invalidate_session(group_id, session_id);
147  } catch (
148  client::exception::cp_group_destroyed&) {
149  invalidate_session(group_id, session_id);
150  }
151  }));
152  }
153  }
154  },
155  duration,
156  duration);
157  }
158 }
159 
160 boost::future<client::protocol::ClientMessage>
161 proxy_session_manager::heartbeat(const raft_group_id& group_id,
162  int64_t session_id)
163 {
164  auto request = client::protocol::codec::cpsession_heartbeatsession_encode(
165  group_id, session_id);
166  return client::spi::impl::ClientInvocation::create(
167  client_, request, "sessionManager")
168  ->invoke();
169 }
170 
171 void
172 proxy_session_manager::invalidate_session(const raft_group_id& group_id,
173  int64_t session_id)
174 {
175  {
176  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
177  auto session = sessions_.find(group_id);
178  if (session != sessions_.end()) {
179  // upgrade to write lock
180  boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(
181  read_lock);
182  sessions_.erase(session);
183  }
184  }
185 }
186 
187 void
188 proxy_session_manager::release_session(const raft_group_id& group_id,
189  int64_t session_id)
190 {
191  release_session(group_id, session_id, 1);
192 }
193 
194 void
195 proxy_session_manager::release_session(const raft_group_id& group_id,
196  int64_t session_id,
197  int32_t count)
198 {
199  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
200  auto session = sessions_.find(group_id);
201  if (session != sessions_.end() && session->second.id == session_id) {
202  session->second.release(count);
203  }
204 }
205 
206 int64_t
207 proxy_session_manager::get_session(const raft_group_id& group_id)
208 {
209  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
210  auto session = sessions_.find(group_id);
211  return session == sessions_.end() ? NO_SESSION_ID : session->second.id;
212 }
213 
214 int64_t
215 proxy_session_manager::get_or_create_unique_thread_id(
216  const raft_group_id& group_id)
217 {
218  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
219  auto key = std::make_pair(group_id, util::get_current_thread_id());
220  auto global_thread_id_it = thread_ids_.find(key);
221  if (global_thread_id_it != thread_ids_.end()) {
222  return global_thread_id_it->second;
223  }
224 
225  auto global_thread_id = generate_thread_id(group_id);
226 
227  // upgrade to write lock
228  boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock);
229  global_thread_id_it = thread_ids_.find(key);
230  if (global_thread_id_it != thread_ids_.end()) {
231  return global_thread_id_it->second;
232  }
233 
234  thread_ids_.emplace(std::move(key), global_thread_id);
235  return global_thread_id;
236 }
237 
238 void
239 proxy_session_manager::shutdown()
240 {
241  boost::unique_lock<boost::shared_mutex> write_lock(lock_);
242  if (scheduled_heartbeat_ && heartbeat_timer_) {
243  heartbeat_timer_->cancel();
244  }
245 
246  std::vector<boost::future<client::protocol::ClientMessage>> invocations;
247  for (const auto& s : sessions_) {
248  invocations.emplace_back(close_session(s.first, s.second.id));
249  }
250 
251  auto waiting_future =
252  boost::when_all(invocations.begin(), invocations.end());
253 
254  waiting_future.wait_for(boost::chrono::seconds(SHUTDOWN_TIMEOUT_SECONDS));
255 
256  sessions_.clear();
257  running_ = false;
258 }
259 
260 boost::future<client::protocol::ClientMessage>
261 proxy_session_manager::close_session(const raft_group_id& group_id,
262  int64_t session_id)
263 {
264  auto request = client::protocol::codec::cpsession_closesession_encode(
265  group_id, session_id);
266  return client::spi::impl::ClientInvocation::create(
267  client_, request, "sessionManager")
268  ->invoke();
269 }
270 
271 int64_t
272 proxy_session_manager::get_session_acquire_count(const raft_group_id& group_id,
273  int64_t session_id)
274 {
275  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
276  auto session = sessions_.find(group_id);
277  return session != sessions_.end() && session->second.id == session_id
278  ? session->second.acquire_count.load()
279  : 0;
280 }
281 
282 int64_t
283 proxy_session_manager::generate_thread_id(const raft_group_id& group_id)
284 {
285  auto request =
286  client::protocol::codec::cpsession_generatethreadid_encode(group_id);
287  return client::spi::impl::ClientInvocation::create(
288  client_, request, "sessionManager")
289  ->invoke()
290  .get()
291  .get_first_fixed_sized_field<int64_t>();
292 }
293 
294 bool
295 proxy_session_manager::session_state::is_valid() const
296 {
297  return is_in_use() || !is_expired();
298 }
299 
300 bool
301 proxy_session_manager::session_state::is_in_use() const
302 {
303  return acquire_count.load() > 0;
304 }
305 
306 bool
307 proxy_session_manager::session_state::is_expired() const
308 {
309  auto expirationTime = creation_time + ttl;
310  if (expirationTime.time_since_epoch().count() < 0) {
311  expirationTime = (std::chrono::steady_clock::time_point::max)();
312  }
313  return std::chrono::steady_clock::now() > expirationTime;
314 }
315 
316 proxy_session_manager::session_state::session_state(int64_t id,
317  int64_t ttl_millis)
318  : id(id)
319  , ttl(ttl_millis)
320  , creation_time(std::chrono::steady_clock::now())
321 {}
322 
323 proxy_session_manager::session_state::session_state(const session_state& rhs)
324  : id(rhs.id)
325  , ttl(rhs.ttl)
326  , creation_time(rhs.creation_time)
327  , acquire_count(rhs.acquire_count.load())
328 {}
329 
330 proxy_session_manager::session_state&
331 proxy_session_manager::session_state::operator=(const session_state& rhs)
332 {
333  id = rhs.id;
334  ttl = rhs.ttl;
335  creation_time = rhs.creation_time;
336  acquire_count = rhs.acquire_count.load();
337 
338  return *this;
339 }
340 
341 int64_t
342 proxy_session_manager::session_state::acquire(int32_t count)
343 {
344  acquire_count.fetch_add(count);
345  return id;
346 }
347 
348 void
349 proxy_session_manager::session_state::release(int32_t count)
350 {
351  acquire_count.fetch_sub(count);
352 }
353 
354 } // namespace session
355 } // namespace internal
356 } // namespace cp
357 } // namespace hazelcast