Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
hazelcast::cp::session_semaphore Class Reference
Inheritance diagram for hazelcast::cp::session_semaphore:

Public Member Functions

 session_semaphore (const std::string &proxy_name, client::spi::ClientContext *context, const raft_group_id &group_id, const std::string &object_name, internal::session::proxy_session_manager &session_manager)
boost::future< void > acquire (int32_t permits) override
boost::future< void > release (int32_t permits) override
boost::future< int32_t > drain_permits () override

Protected Member Functions

boost::future< bool > try_acquire_for_millis (int32_t permits, std::chrono::milliseconds timeout) override
void throw_illegal_state_exception (std::exception_ptr e)
int64_t get_thread_id () override
boost::future< void > do_change_permits (int32_t delta) override

Static Protected Attributes

static constexpr int32_t DRAIN_SESSION_ACQ_COUNT = 1024
 Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant to increment its local session acquire count.

Detailed Description

Definition at line 1278 of file cp.h.

Constructor & Destructor Documentation

◆ session_semaphore()

hazelcast::cp::session_semaphore::session_semaphore ( const std::string & proxy_name,
client::spi::ClientContext * context,
const raft_group_id & group_id,
const std::string & object_name,
internal::session::proxy_session_manager & session_manager )

Definition at line 1172 of file cp.cpp.

1178 : counting_semaphore(proxy_name,
1179 context,
1180 group_id,
1181 object_name,
1182 session_manager)
1183{}

Member Function Documentation

◆ acquire()

boost::future< void > hazelcast::cp::session_semaphore::acquire ( int32_t permits)
override

Definition at line 1186 of file cp.cpp.

1187{
1188 return to_void_future(
1189 try_acquire_for_millis(permits, std::chrono::milliseconds(-1)));
1190}

◆ do_change_permits()

boost::future< void > hazelcast::cp::session_semaphore::do_change_permits ( int32_t delta)
overrideprotected

Definition at line 1355 of file cp.cpp.

1356{
1357 auto session_id =
1358 session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
1359 auto thread_id = get_thread_id();
1360 auto invocation_uid =
1361 get_context().get_hazelcast_client_implementation()->random_uuid();
1362
1363 auto request = client::protocol::codec::semaphore_change_encode(
1364 group_id_, object_name_, session_id, thread_id, invocation_uid, delta);
1365 return client::spi::impl::ClientInvocation::create(
1366 context_, request, object_name_)
1367 ->invoke()
1368 .then(boost::launch::sync,
1369 [=](boost::future<client::protocol::ClientMessage> f) {
1370 try {
1371 f.get();
1372 session_manager_.release_session(group_id_, session_id);
1373 } catch (client::exception::session_expired&) {
1374 session_manager_.invalidate_session(group_id_, session_id);
1375 throw_illegal_state_exception(std::current_exception());
1376 }
1377 });
1378}
static constexpr int32_t DRAIN_SESSION_ACQ_COUNT
Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant...
Definition cp.h:1301

◆ drain_permits()

boost::future< int32_t > hazelcast::cp::session_semaphore::drain_permits ( )
override

Definition at line 1313 of file cp.cpp.

1314{
1315 auto thread_id = get_thread_id();
1316 auto invocation_uid =
1317 get_context().get_hazelcast_client_implementation()->random_uuid();
1318
1319 auto do_drain_once = ([=]() {
1320 auto session_id =
1321 session_manager_.acquire_session(group_id_, DRAIN_SESSION_ACQ_COUNT);
1322 auto request = client::protocol::codec::semaphore_drain_encode(
1323 group_id_, object_name_, session_id, thread_id, invocation_uid);
1324 return client::spi::impl::ClientInvocation::create(
1325 context_, request, object_name_)
1326 ->invoke()
1327 .then(
1328 boost::launch::sync,
1329 [=](boost::future<client::protocol::ClientMessage> f) {
1330 try {
1331 auto count = f.get().get_first_fixed_sized_field<int32_t>();
1332 session_manager_.release_session(
1333 group_id_, session_id, DRAIN_SESSION_ACQ_COUNT - count);
1334 return count;
1335 } catch (client::exception::session_expired&) {
1336 session_manager_.invalidate_session(group_id_, session_id);
1337 return -1;
1338 }
1339 });
1340 });
1341
1342 return do_drain_once().then(
1343 boost::launch::sync, [=](boost::future<int32_t> f) {
1344 int32_t count = f.get();
1345 if (count != -1) {
1346 return count;
1347 }
1348 while ((count = do_drain_once().get()) == -1) {
1349 }
1350 return count;
1351 });
1352}

◆ get_thread_id()

int64_t hazelcast::cp::session_semaphore::get_thread_id ( )
overrideprotected

Definition at line 1307 of file cp.cpp.

1308{
1309 return util::get_current_thread_id();
1310}

◆ release()

boost::future< void > hazelcast::cp::session_semaphore::release ( int32_t permits)
override

Definition at line 1269 of file cp.cpp.

1270{
1271 util::Preconditions::check_positive(permits, "Permits must be positive!");
1272 auto session_id = session_manager_.get_session(group_id_);
1273 if (session_id == internal::session::proxy_session_manager::NO_SESSION_ID) {
1274 throw_illegal_state_exception(nullptr);
1275 }
1276
1277 auto thread_id = get_thread_id();
1278 return do_release(permits, thread_id, session_id)
1279 .then([=](boost::future<void> f) {
1280 try {
1281 f.get();
1282 session_manager_.release_session(group_id_, session_id, permits);
1283 } catch (client::exception::session_expired&) {
1284 session_manager_.invalidate_session(group_id_, session_id);
1285 session_manager_.release_session(group_id_, session_id, permits);
1286 throw_illegal_state_exception(std::current_exception());
1287 }
1288 });
1289}

◆ throw_illegal_state_exception()

void hazelcast::cp::session_semaphore::throw_illegal_state_exception ( std::exception_ptr e)
protected

Definition at line 1292 of file cp.cpp.

1293{
1294 auto ise = boost::enable_current_exception(client::exception::illegal_state(
1295 "session_semaphore::illegal_state", "No valid session!"));
1296 if (!e) {
1297 throw ise;
1298 }
1299 try {
1300 std::rethrow_exception(e);
1301 } catch (...) {
1302 std::throw_with_nested(ise);
1303 }
1304}

◆ try_acquire_for_millis()

boost::future< bool > hazelcast::cp::session_semaphore::try_acquire_for_millis ( int32_t permits,
std::chrono::milliseconds timeout )
overrideprotected

Definition at line 1193 of file cp.cpp.

1195{
1196 util::Preconditions::check_not_negative(
1197 permits, "permits must not be negative number.");
1198
1199 auto thread_id = get_thread_id();
1200 auto invocation_uid =
1201 get_context().get_hazelcast_client_implementation()->random_uuid();
1202
1203 auto do_try_acquire_once = ([=]() {
1204 auto start = std::chrono::steady_clock::now();
1205 auto use_timeout = timeout >= std::chrono::milliseconds::zero();
1206 auto session_id = session_manager_.acquire_session(group_id_, permits);
1207 auto request =
1208 client::protocol::codec::semaphore_acquire_encode(group_id_,
1209 object_name_,
1210 session_id,
1211 thread_id,
1212 invocation_uid,
1213 permits,
1214 timeout.count());
1215 return client::spi::impl::ClientInvocation::create(
1216 context_, request, object_name_)
1217 ->invoke()
1218 .then(
1219 boost::launch::sync,
1220 [=](boost::future<client::protocol::ClientMessage> f) {
1221 try {
1222 auto acquired = f.get().get_first_fixed_sized_field<bool>();
1223 if (!acquired) {
1224 session_manager_.release_session(group_id_, session_id);
1225 }
1226 // first bool means acquired or not, second bool means if
1227 // should try again
1228 return std::make_pair(acquired, false);
1229 } catch (client::exception::session_expired&) {
1230 session_manager_.invalidate_session(group_id_, session_id);
1231 if (use_timeout &&
1232 (timeout - (std::chrono::steady_clock::now() - start) <=
1233 std::chrono::milliseconds::zero())) {
1234 return std::make_pair(false, false);
1235 }
1236 return std::make_pair(false, true);
1237 } catch (client::exception::wait_key_cancelled&) {
1238 session_manager_.release_session(
1239 group_id_, session_id, permits);
1240 if (!use_timeout) {
1241 BOOST_THROW_EXCEPTION(client::exception::illegal_state(
1242 "session_semaphore::try_acquire_for_millis",
1243 (boost::format(
1244 "Semaphore[%1%] not acquired because the acquire "
1245 "call on the CP group is cancelled, possibly "
1246 "because of another indeterminate call from the "
1247 "same thread.") %
1248 object_name_)
1249 .str()));
1250 }
1251 return std::make_pair(false, false);
1252 }
1253 });
1254 });
1255
1256 return do_try_acquire_once().then(
1257 boost::launch::sync, [=](boost::future<std::pair<bool, bool>> f) {
1258 auto result = f.get();
1259 if (!result.second) {
1260 return result.first;
1261 }
1262 for (; result.second; result = do_try_acquire_once().get())
1263 ;
1264 return result.first;
1265 });
1266}

Member Data Documentation

◆ DRAIN_SESSION_ACQ_COUNT

int32_t hazelcast::cp::session_semaphore::DRAIN_SESSION_ACQ_COUNT = 1024
staticconstexprprotected

Since a proxy does not know how many permits will be drained on the Raft group, it uses this constant to increment its local session acquire count.

Then, it adjusts the local session acquire count after the drain response is returned.

Definition at line 1301 of file cp.h.


The documentation for this class was generated from the following files:
  • hazelcast/include/hazelcast/cp/cp.h
  • hazelcast/src/hazelcast/cp/cp.cpp