1 #include <hazelcast/client/spi/ClientContext.h>
18 #include <boost/thread/shared_lock_guard.hpp>
19 #include <boost/container_hash/hash.hpp>
21 #include "hazelcast/cp/cp_impl.h"
22 #include "hazelcast/cp/cp.h"
23 #include "hazelcast/client/exception/protocol_exceptions.h"
24 #include "hazelcast/client/protocol/codec/codecs.h"
25 #include "hazelcast/client/spi/impl/ClientInvocation.h"
26 #include "hazelcast/client/spi/impl/ClientExecutionServiceImpl.h"
27 #include "hazelcast/client/proxy/SerializingProxy.h"
33 constexpr int64_t proxy_session_manager::NO_SESSION_ID;
34 constexpr int64_t proxy_session_manager::SHUTDOWN_TIMEOUT_SECONDS;
35 proxy_session_manager::proxy_session_manager(
36 hazelcast::client::spi::ClientContext &client) : client_(client) {}
38 int64_t proxy_session_manager::acquire_session(
const raft_group_id &group_id) {
39 return get_or_create_session(group_id).acquire(1);
42 int64_t proxy_session_manager::acquire_session(
const raft_group_id &group_id, int32_t count) {
43 return get_or_create_session(group_id).acquire(count);
46 proxy_session_manager::session_state &
47 proxy_session_manager::get_or_create_session(
const raft_group_id &group_id) {
48 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
51 BOOST_THROW_EXCEPTION(client::exception::hazelcast_instance_not_active(
52 "proxy_session_manager::get_or_create_session",
53 "Session manager is already shut down!"));
56 auto session = sessions_.find(group_id);
57 if (session == sessions_.end() || !session->second.is_valid()) {
59 boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock);
60 session = sessions_.find(group_id);
61 if (session == sessions_.end() || !session->second.is_valid()) {
62 session = create_new_session(group_id);
65 return session->second;
68 std::unordered_map<raft_group_id, proxy_session_manager::session_state>::iterator
69 proxy_session_manager::create_new_session(
const raft_group_id &group_id) {
71 auto response = request_new_session(group_id);
72 auto result = sessions_.emplace(group_id, session_state{response.id, response.ttl_millis});
73 assert(result.second);
74 auto session = sessions_.emplace(group_id, session_state{response.id, response.ttl_millis}).first;
75 schedule_heartbeat_task(response.heartbeat_millis);
79 proxy_session_manager::session_response
80 proxy_session_manager::request_new_session(
const raft_group_id &group_id) {
81 auto request = client::protocol::codec::cpsession_createsession_encode(group_id,
83 auto response = client::spi::impl::ClientInvocation::create(client_, request,
84 "sessionManager")->invoke().get();
85 auto session_id = response.get_first_fixed_sized_field<int64_t>();
86 auto ttl_millis = response.get<int64_t>();
87 auto hearbeat_millis = response.get<int64_t>();
88 return {session_id, ttl_millis, hearbeat_millis};
91 void proxy_session_manager::schedule_heartbeat_task(int64_t hearbeat_millis) {
93 if (scheduled_heartbeat_.compare_exchange_strong(current,
true)) {
94 auto prev_heartbeats = std::make_shared<std::vector<boost::future<void>>>();
95 auto duration = std::chrono::milliseconds(hearbeat_millis);
96 heartbeat_timer_ = client_.get_client_execution_service().schedule_with_repetition([=]() {
98 prev_heartbeats->clear();
99 std::vector<std::tuple<raft_group_id, int64_t, bool>> sessions;
101 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
102 for (
const auto &s : sessions_) {
103 sessions.emplace_back(s.first, s.second.id, s.second.is_in_use());
106 for (
const auto &entry : sessions) {
107 raft_group_id group_id;
110 std::tie(group_id, session_id, in_use) = entry;
112 prev_heartbeats->emplace_back(
113 heartbeat(group_id, session_id).then(boost::launch::sync,
114 [=](boost::future<client::protocol::ClientMessage> f) {
118 client::exception::session_expired &) {
119 invalidate_session(group_id,
122 client::exception::cp_group_destroyed &) {
123 invalidate_session(group_id,
129 }, duration, duration);
133 boost::future<client::protocol::ClientMessage>
134 proxy_session_manager::heartbeat(
const raft_group_id &group_id, int64_t session_id) {
135 auto request = client::protocol::codec::cpsession_heartbeatsession_encode(group_id, session_id);
136 return client::spi::impl::ClientInvocation::create(client_, request,
"sessionManager")->invoke();
139 void proxy_session_manager::invalidate_session(
const raft_group_id &group_id, int64_t session_id) {
141 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
142 auto session = sessions_.find(group_id);
143 if (session != sessions_.end()) {
145 boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock);
146 sessions_.erase(session);
151 void proxy_session_manager::release_session(
const raft_group_id &group_id, int64_t session_id) {
152 release_session(group_id, session_id, 1);
155 void proxy_session_manager::release_session(
const raft_group_id &group_id, int64_t session_id,
157 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
158 auto session = sessions_.find(group_id);
159 if (session != sessions_.end() && session->second.id == session_id) {
160 session->second.release(count);
164 int64_t proxy_session_manager::get_session(
const raft_group_id &group_id) {
165 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
166 auto session = sessions_.find(group_id);
167 return session == sessions_.end() ? NO_SESSION_ID : session->second.id;
170 int64_t proxy_session_manager::get_or_create_unique_thread_id(
const raft_group_id &group_id) {
171 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
172 auto key = std::make_pair(group_id, util::get_current_thread_id());
173 auto global_thread_id_it = thread_ids_.find(key);
174 if (global_thread_id_it != thread_ids_.end()) {
175 return global_thread_id_it->second;
178 auto global_thread_id = generate_thread_id(group_id);
181 boost::upgrade_to_unique_lock<boost::shared_mutex> write_lock(read_lock);
182 global_thread_id_it = thread_ids_.find(key);
183 if (global_thread_id_it != thread_ids_.end()) {
184 return global_thread_id_it->second;
187 thread_ids_.emplace(std::move(key), global_thread_id);
188 return global_thread_id;
191 void proxy_session_manager::shutdown() {
192 boost::unique_lock<boost::shared_mutex> write_lock(lock_);
193 if (scheduled_heartbeat_ && heartbeat_timer_) {
194 heartbeat_timer_->cancel();
197 std::vector<boost::future<client::protocol::ClientMessage>> invocations;
198 for (
const auto &s : sessions_) {
199 invocations.emplace_back(close_session(s.first, s.second.id));
202 auto waiting_future = boost::when_all(invocations.begin(), invocations.end());
204 waiting_future.wait_for(boost::chrono::seconds(SHUTDOWN_TIMEOUT_SECONDS));
210 boost::future<client::protocol::ClientMessage>
211 proxy_session_manager::close_session(
const raft_group_id &group_id, int64_t session_id) {
212 auto request = client::protocol::codec::cpsession_closesession_encode(group_id, session_id);
213 return client::spi::impl::ClientInvocation::create(client_, request,
"sessionManager")->invoke();
216 int64_t proxy_session_manager::get_session_acquire_count(
const raft_group_id &group_id, int64_t session_id) {
217 boost::upgrade_lock<boost::shared_mutex> read_lock(lock_);
218 auto session = sessions_.find(group_id);
219 return session != sessions_.end() && session->second.id == session_id ? session->second.acquire_count.load() : 0;
222 int64_t proxy_session_manager::generate_thread_id(
const raft_group_id &group_id) {
223 auto request = client::protocol::codec::cpsession_generatethreadid_encode(group_id);
224 return client::spi::impl::ClientInvocation::create(client_, request,
225 "sessionManager")->invoke().get().get_first_fixed_sized_field<int64_t>();
228 bool proxy_session_manager::session_state::is_valid()
const {
229 return is_in_use() || !is_expired();
232 bool proxy_session_manager::session_state::is_in_use()
const {
233 return acquire_count.load() > 0;
236 bool proxy_session_manager::session_state::is_expired()
const {
237 auto expirationTime = creation_time + ttl;
238 if (expirationTime.time_since_epoch().count() < 0) {
239 expirationTime = (std::chrono::steady_clock::time_point::max)();
241 return std::chrono::steady_clock::now() > expirationTime;
244 proxy_session_manager::session_state::session_state(int64_t
id, int64_t ttl_millis)
245 : id(id), ttl(ttl_millis), creation_time(std::chrono::steady_clock::now()) {}
247 proxy_session_manager::session_state::session_state(
const session_state &rhs)
248 : id(rhs.id), ttl(rhs.ttl), creation_time(rhs.creation_time),
249 acquire_count(rhs.acquire_count.load()) {}
251 int64_t proxy_session_manager::session_state::acquire(int32_t count) {
252 acquire_count.fetch_add(count);
256 void proxy_session_manager::session_state::release(int32_t count) {
257 acquire_count.fetch_sub(count);