Hazelcast C++ Client
Hazelcast C++ Client Library
cp_impl.cpp
1 #include <hazelcast/client/spi/ClientContext.h>
2 
3 /*
4  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 #include <boost/thread/shared_lock_guard.hpp>
19 #include <boost/container_hash/hash.hpp>
20 
21 #include "hazelcast/cp/cp_impl.h"
22 #include "hazelcast/cp/cp.h"
23 #include "hazelcast/client/exception/protocol_exceptions.h"
24 #include "hazelcast/client/protocol/codec/codecs.h"
25 #include "hazelcast/client/spi/impl/ClientInvocation.h"
26 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
27 #include "hazelcast/client/proxy/SerializingProxy.h"
28 
29 namespace hazelcast {
30  namespace cp {
31  namespace internal {
32  namespace session {
33  constexpr int64_t proxy_session_manager::NO_SESSION_ID;
34  constexpr int64_t proxy_session_manager::SHUTDOWN_TIMEOUT_SECONDS;
35  proxy_session_manager::proxy_session_manager(
36  hazelcast::client::spi::ClientContext &client) : client_(client) {}
37 
38  int64_t proxy_session_manager::acquire_session(const raft_group_id &group_id) {
39  return get_or_create_session(group_id).acquire(1);
40  }
41 
42  int64_t proxy_session_manager::acquire_session(const raft_group_id &group_id, int32_t count) {
43  return get_or_create_session(group_id).acquire(count);
44  }
45 
46  proxy_session_manager::session_state &
47  proxy_session_manager::get_or_create_session(const raft_group_id &group_id) {
48  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
49 
50  if (!running_) {
51  BOOST_THROW_EXCEPTION(client::exception::hazelcast_instance_not_active(
52  "proxy_session_manager::get_or_create_session",
53  "Session manager is already shut down!"));
54  }
55 
56  auto session = sessions_.find(group_id);
57  if (session == sessions_.end() || !session->second.is_valid()) {
58  // upgrade to write lock
59  boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock);
60  session = sessions_.find(group_id);
61  if (session == sessions_.end() || !session->second.is_valid()) {
62  session = create_new_session(group_id);
63  }
64  }
65  return session->second;
66  }
67 
68  std::unordered_map<raft_group_id, proxy_session_manager::session_state>::iterator
69  proxy_session_manager::create_new_session(const raft_group_id &group_id) {
70  // the lock_ is already acquired as write lock
71  auto response = request_new_session(group_id);
72  auto result = sessions_.emplace(group_id, session_state{response.id, response.ttl_millis});
73  assert(result.second);
74  auto session = sessions_.emplace(group_id, session_state{response.id, response.ttl_millis}).first;
75  schedule_heartbeat_task(response.heartbeat_millis);
76  return session;
77  }
78 
79  proxy_session_manager::session_response
80  proxy_session_manager::request_new_session(const raft_group_id &group_id) {
81  auto request = client::protocol::codec::cpsession_createsession_encode(group_id,
82  client_.get_name());
83  auto response = client::spi::impl::ClientInvocation::create(client_, request,
84  "sessionManager")->invoke().get();
85  auto session_id = response.get_first_fixed_sized_field<int64_t>();
86  auto ttl_millis = response.get<int64_t>();
87  auto hearbeat_millis = response.get<int64_t>();
88  return {session_id, ttl_millis, hearbeat_millis};
89  }
90 
91  void proxy_session_manager::schedule_heartbeat_task(int64_t hearbeat_millis) {
92  bool current = false;
93  if (scheduled_heartbeat_.compare_exchange_strong(current, true)) {
94  auto prev_heartbeats = std::make_shared<std::vector<boost::future<void>>>();
95  auto duration = std::chrono::milliseconds(hearbeat_millis);
96  heartbeat_timer_ = client_.get_client_execution_service().schedule_with_repetition([=]() {
97  // we can not cancel a future
98  prev_heartbeats->clear();
99  std::vector<std::tuple<raft_group_id, int64_t, bool>> sessions;
100  {
101  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
102  for (const auto &s : sessions_) {
103  sessions.emplace_back(s.first, s.second.id, s.second.is_in_use());
104  }
105  }
106  for (const auto &entry : sessions) {
107  raft_group_id group_id;
108  int64_t session_id;
109  bool in_use;
110  std::tie(group_id, session_id, in_use) = entry;
111  if (in_use) {
112  prev_heartbeats->emplace_back(
113  heartbeat(group_id, session_id).then(boost::launch::sync,
114  [=](boost::future<client::protocol::ClientMessage> f) {
115  try {
116  f.get();
117  } catch (
118  client::exception::session_expired &) {
119  invalidate_session(group_id,
120  session_id);
121  } catch (
122  client::exception::cp_group_destroyed &) {
123  invalidate_session(group_id,
124  session_id);
125  }
126  }));
127  }
128  }
129  }, duration, duration);
130  }
131  }
132 
133  boost::future<client::protocol::ClientMessage>
134  proxy_session_manager::heartbeat(const raft_group_id &group_id, int64_t session_id) {
135  auto request = client::protocol::codec::cpsession_heartbeatsession_encode(group_id, session_id);
136  return client::spi::impl::ClientInvocation::create(client_, request, "sessionManager")->invoke();
137  }
138 
139  void proxy_session_manager::invalidate_session(const raft_group_id &group_id, int64_t session_id) {
140  {
141  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
142  auto session = sessions_.find(group_id);
143  if (session != sessions_.end()) {
144  // upgrade to write lock
145  boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock);
146  sessions_.erase(session);
147  }
148  }
149  }
150 
151  void proxy_session_manager::release_session(const raft_group_id &group_id, int64_t session_id) {
152  release_session(group_id, session_id, 1);
153  }
154 
155  void proxy_session_manager::release_session(const raft_group_id &group_id, int64_t session_id,
156  int32_t count) {
157  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
158  auto session = sessions_.find(group_id);
159  if (session != sessions_.end() && session->second.id == session_id) {
160  session->second.release(count);
161  }
162  }
163 
164  int64_t proxy_session_manager::get_session(const raft_group_id &group_id) {
165  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
166  auto session = sessions_.find(group_id);
167  return session == sessions_.end() ? NO_SESSION_ID : session->second.id;
168  }
169 
170  int64_t proxy_session_manager::get_or_create_unique_thread_id(const raft_group_id &group_id) {
171  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
172  auto key = std::make_pair(group_id, util::get_current_thread_id());
173  auto global_thread_id_it = thread_ids_.find(key);
174  if (global_thread_id_it != thread_ids_.end()) {
175  return global_thread_id_it->second;
176  }
177 
178  auto global_thread_id = generate_thread_id(group_id);
179 
180  // upgrade to write lock
181  boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock);
182  global_thread_id_it = thread_ids_.find(key);
183  if (global_thread_id_it != thread_ids_.end()) {
184  return global_thread_id_it->second;
185  }
186 
187  thread_ids_.emplace(std::move(key), global_thread_id);
188  return global_thread_id;
189  }
190 
191  void proxy_session_manager::shutdown() {
192  boost::unique_lock<boost::shared_mutex> write_lock(lock_);
193  if (scheduled_heartbeat_ && heartbeat_timer_) {
194  heartbeat_timer_->cancel();
195  }
196 
197  std::vector<boost::future<client::protocol::ClientMessage>> invocations;
198  for (const auto &s : sessions_) {
199  invocations.emplace_back(close_session(s.first, s.second.id));
200  }
201 
202  auto waiting_future = boost::when_all(invocations.begin(), invocations.end());
203 
204  waiting_future.wait_for(boost::chrono::seconds(SHUTDOWN_TIMEOUT_SECONDS));
205 
206  sessions_.clear();
207  running_ = false;
208  }
209 
210  boost::future<client::protocol::ClientMessage>
211  proxy_session_manager::close_session(const raft_group_id &group_id, int64_t session_id) {
212  auto request = client::protocol::codec::cpsession_closesession_encode(group_id, session_id);
213  return client::spi::impl::ClientInvocation::create(client_, request, "sessionManager")->invoke();
214  }
215 
216  int64_t proxy_session_manager::get_session_acquire_count(const raft_group_id &group_id, int64_t session_id) {
217  boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
218  auto session = sessions_.find(group_id);
219  return session != sessions_.end() && session->second.id == session_id ? session->second.acquire_count.load() : 0;
220  }
221 
222  int64_t proxy_session_manager::generate_thread_id(const raft_group_id &group_id) {
223  auto request = client::protocol::codec::cpsession_generatethreadid_encode(group_id);
224  return client::spi::impl::ClientInvocation::create(client_, request,
225  "sessionManager")->invoke().get().get_first_fixed_sized_field<int64_t>();
226  }
227 
228  bool proxy_session_manager::session_state::is_valid() const {
229  return is_in_use() || !is_expired();
230  }
231 
232  bool proxy_session_manager::session_state::is_in_use() const {
233  return acquire_count.load() > 0;
234  }
235 
236  bool proxy_session_manager::session_state::is_expired() const {
237  auto expirationTime = creation_time + ttl;
238  if (expirationTime.time_since_epoch().count() < 0) {
239  expirationTime = (std::chrono::steady_clock::time_point::max)();
240  }
241  return std::chrono::steady_clock::now() > expirationTime;
242  }
243 
244  proxy_session_manager::session_state::session_state(int64_t id, int64_t ttl_millis)
245  : id(id), ttl(ttl_millis), creation_time(std::chrono::steady_clock::now()) {}
246 
247  proxy_session_manager::session_state::session_state(const session_state &rhs)
248  : id(rhs.id), ttl(rhs.ttl), creation_time(rhs.creation_time),
249  acquire_count(rhs.acquire_count.load()) {}
250 
251  int64_t proxy_session_manager::session_state::acquire(int32_t count) {
252  acquire_count.fetch_add(count);
253  return id;
254  }
255 
256  void proxy_session_manager::session_state::release(int32_t count) {
257  acquire_count.fetch_sub(count);
258  }
259 
260  }
261  }
262  }
263 }