17 #include <boost/thread/shared_lock_guard.hpp>
18 #include <boost/container_hash/hash.hpp>
20 #include "hazelcast/cp/cp_impl.h"
21 #include "hazelcast/cp/cp.h"
22 #include "hazelcast/client/exception/protocol_exceptions.h"
23 #include "hazelcast/client/protocol/codec/codecs.h"
24 #include "hazelcast/client/spi/ClientContext.h"
25 #include "hazelcast/client/spi/impl/ClientInvocation.h"
26 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
27 #include "hazelcast/client/proxy/SerializingProxy.h"
33 constexpr int64_t proxy_session_manager::NO_SESSION_ID;
34 constexpr int64_t proxy_session_manager::SHUTDOWN_TIMEOUT_SECONDS;
35 proxy_session_manager::proxy_session_manager(
36 hazelcast::client::spi::ClientContext& client)
41 proxy_session_manager::acquire_session(
const raft_group_id& group_id)
43 return get_or_create_session(group_id).acquire(1);
47 proxy_session_manager::acquire_session(
const raft_group_id& group_id,
50 return get_or_create_session(group_id).acquire(count);
53 proxy_session_manager::session_state&
54 proxy_session_manager::get_or_create_session(
const raft_group_id& group_id)
56 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
59 BOOST_THROW_EXCEPTION(client::exception::hazelcast_instance_not_active(
60 "proxy_session_manager::get_or_create_session",
61 "Session manager is already shut down!"));
64 auto session = sessions_.find(group_id);
65 if (session == sessions_.end() || !session->second.is_valid()) {
67 boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(
69 session = sessions_.find(group_id);
70 if (session == sessions_.end() || !session->second.is_valid()) {
71 session = create_new_session(group_id);
74 return session->second;
77 std::unordered_map<raft_group_id,
78 proxy_session_manager::session_state>::iterator
79 proxy_session_manager::create_new_session(
const raft_group_id& group_id)
82 auto response = request_new_session(group_id);
83 session_state state { response.id , response.ttl_millis };
85 auto result = sessions_.emplace( group_id, state );
88 result.first->second = state;
90 schedule_heartbeat_task(response.heartbeat_millis);
94 proxy_session_manager::session_response
95 proxy_session_manager::request_new_session(
const raft_group_id& group_id)
97 auto request = client::protocol::codec::cpsession_createsession_encode(
98 group_id, client_.get_name());
99 auto response = client::spi::impl::ClientInvocation::create(
100 client_, request,
"sessionManager")
103 auto session_id = response.get_first_fixed_sized_field<int64_t>();
104 auto ttl_millis = response.get<int64_t>();
105 auto hearbeat_millis = response.get<int64_t>();
106 return { session_id, ttl_millis, hearbeat_millis };
110 proxy_session_manager::schedule_heartbeat_task(int64_t hearbeat_millis)
112 bool current =
false;
113 if (scheduled_heartbeat_.compare_exchange_strong(current,
true)) {
114 auto prev_heartbeats =
115 std::make_shared<std::vector<boost::future<void>>>();
116 auto duration = std::chrono::milliseconds(hearbeat_millis);
118 client_.get_client_execution_service().schedule_with_repetition(
121 prev_heartbeats->clear();
122 std::vector<std::tuple<raft_group_id, int64_t, bool>> sessions;
124 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
125 for (
const auto& s : sessions_) {
126 sessions.emplace_back(
127 s.first, s.second.id, s.second.is_in_use());
130 for (
const auto& entry : sessions) {
131 raft_group_id group_id;
134 std::tie(group_id, session_id, in_use) = entry;
136 prev_heartbeats->emplace_back(
137 heartbeat(group_id, session_id)
140 [=](boost::future<client::protocol::ClientMessage>
145 client::exception::session_expired&) {
146 invalidate_session(group_id, session_id);
148 client::exception::cp_group_destroyed&) {
149 invalidate_session(group_id, session_id);
160 boost::future<client::protocol::ClientMessage>
161 proxy_session_manager::heartbeat(
const raft_group_id& group_id,
164 auto request = client::protocol::codec::cpsession_heartbeatsession_encode(
165 group_id, session_id);
166 return client::spi::impl::ClientInvocation::create(
167 client_, request,
"sessionManager")
172 proxy_session_manager::invalidate_session(
const raft_group_id& group_id,
176 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
177 auto session = sessions_.find(group_id);
178 if (session != sessions_.end()) {
180 boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(
182 sessions_.erase(session);
188 proxy_session_manager::release_session(
const raft_group_id& group_id,
191 release_session(group_id, session_id, 1);
195 proxy_session_manager::release_session(
const raft_group_id& group_id,
199 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
200 auto session = sessions_.find(group_id);
201 if (session != sessions_.end() && session->second.id == session_id) {
202 session->second.release(count);
207 proxy_session_manager::get_session(
const raft_group_id& group_id)
209 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
210 auto session = sessions_.find(group_id);
211 return session == sessions_.end() ? NO_SESSION_ID : session->second.id;
215 proxy_session_manager::get_or_create_unique_thread_id(
216 const raft_group_id& group_id)
218 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
219 auto key = std::make_pair(group_id, util::get_current_thread_id());
220 auto global_thread_id_it = thread_ids_.find(key);
221 if (global_thread_id_it != thread_ids_.end()) {
222 return global_thread_id_it->second;
225 auto global_thread_id = generate_thread_id(group_id);
228 boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock);
229 global_thread_id_it = thread_ids_.find(key);
230 if (global_thread_id_it != thread_ids_.end()) {
231 return global_thread_id_it->second;
234 thread_ids_.emplace(std::move(key), global_thread_id);
235 return global_thread_id;
239 proxy_session_manager::shutdown()
241 boost::unique_lock<boost::shared_mutex> write_lock(lock_);
242 if (scheduled_heartbeat_ && heartbeat_timer_) {
243 heartbeat_timer_->cancel();
246 std::vector<boost::future<client::protocol::ClientMessage>> invocations;
247 for (
const auto& s : sessions_) {
248 invocations.emplace_back(close_session(s.first, s.second.id));
251 auto waiting_future =
252 boost::when_all(invocations.begin(), invocations.end());
254 waiting_future.wait_for(boost::chrono::seconds(SHUTDOWN_TIMEOUT_SECONDS));
260 boost::future<client::protocol::ClientMessage>
261 proxy_session_manager::close_session(
const raft_group_id& group_id,
264 auto request = client::protocol::codec::cpsession_closesession_encode(
265 group_id, session_id);
266 return client::spi::impl::ClientInvocation::create(
267 client_, request,
"sessionManager")
272 proxy_session_manager::get_session_acquire_count(
const raft_group_id& group_id,
275 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
276 auto session = sessions_.find(group_id);
277 return session != sessions_.end() && session->second.id == session_id
278 ? session->second.acquire_count.load()
283 proxy_session_manager::generate_thread_id(
const raft_group_id& group_id)
286 client::protocol::codec::cpsession_generatethreadid_encode(group_id);
287 return client::spi::impl::ClientInvocation::create(
288 client_, request,
"sessionManager")
291 .get_first_fixed_sized_field<int64_t>();
295 proxy_session_manager::session_state::is_valid()
const
297 return is_in_use() || !is_expired();
301 proxy_session_manager::session_state::is_in_use()
const
303 return acquire_count.load() > 0;
307 proxy_session_manager::session_state::is_expired()
const
309 auto expirationTime = creation_time + ttl;
310 if (expirationTime.time_since_epoch().count() < 0) {
311 expirationTime = (std::chrono::steady_clock::time_point::max)();
313 return std::chrono::steady_clock::now() > expirationTime;
316 proxy_session_manager::session_state::session_state(int64_t
id,
320 , creation_time(std::chrono::steady_clock::now())
323 proxy_session_manager::session_state::session_state(
const session_state& rhs)
326 , creation_time(rhs.creation_time)
327 , acquire_count(rhs.acquire_count.load())
330 proxy_session_manager::session_state&
331 proxy_session_manager::session_state::operator=(
const session_state& rhs)
335 creation_time = rhs.creation_time;
336 acquire_count = rhs.acquire_count.load();
342 proxy_session_manager::session_state::acquire(int32_t count)
344 acquire_count.fetch_add(count);
349 proxy_session_manager::session_state::release(int32_t count)
351 acquire_count.fetch_sub(count);