17 #include <boost/uuid/uuid_io.hpp>
19 #include <hazelcast/client/txn/client_transaction_util.h>
20 #include "hazelcast/client/txn/TransactionProxy.h"
21 #include "hazelcast/client/transaction_options.h"
22 #include "hazelcast/util/Util.h"
23 #include "hazelcast/client/spi/impl/ClientInvocation.h"
24 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
25 #include "hazelcast/client/proxy/TransactionalMapImpl.h"
26 #include "hazelcast/client/proxy/TransactionalMultiMapImpl.h"
27 #include "hazelcast/client/proxy/TransactionalListImpl.h"
28 #include "hazelcast/client/proxy/TransactionalQueueImpl.h"
29 #include "hazelcast/client/proxy/TransactionalSetImpl.h"
30 #include "hazelcast/client/imap.h"
31 #include "hazelcast/client/multi_map.h"
32 #include "hazelcast/client/ilist.h"
33 #include "hazelcast/client/iqueue.h"
34 #include "hazelcast/client/iset.h"
35 #include "hazelcast/client/transaction_context.h"
36 #include "hazelcast/client/spi/impl/ClientTransactionManagerServiceImpl.h"
37 #include "hazelcast/client/protocol/codec/codecs.h"
42 TransactionProxy::TransactionProxy(transaction_options &txn_options, spi::ClientContext &client_context,
43 std::shared_ptr<connection::Connection> connection)
44 : options_(txn_options), client_context_(client_context), connection_(connection),
45 thread_id_(util::get_current_thread_id()), state_(TxnState::NO_TXN) {}
47 TransactionProxy::TransactionProxy(
const TransactionProxy &rhs) : options_(rhs.options_),
48 client_context_(rhs.client_context_),
49 connection_(rhs.connection_),
50 thread_id_(rhs.thread_id_), txn_id_(rhs.txn_id_),
52 start_time_(rhs.start_time_) {
53 transaction_exists_.store(rhs.transaction_exists_.load());
56 boost::uuids::uuid TransactionProxy::get_txn_id()
const {
60 TxnState TransactionProxy::get_state()
const {
64 std::chrono::milliseconds TransactionProxy::get_timeout()
const {
65 return options_.get_timeout();
68 boost::future<void> TransactionProxy::begin() {
70 if (state_ == TxnState::ACTIVE) {
71 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::begin()",
72 "Transaction is already active"));
75 if (transaction_exists_) {
76 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::begin()",
77 "Nested transactions are not allowed!"));
79 transaction_exists_.store(
true);
80 start_time_ = std::chrono::steady_clock::now();
81 auto request = protocol::codec::transaction_create_encode(
82 std::chrono::duration_cast<std::chrono::milliseconds>(get_timeout()).count(), options_.get_durability(),
83 static_cast<int32_t
>(options_.get_transaction_type()), thread_id_);
84 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
88 msg.rd_ptr(msg.RESPONSE_HEADER_LEN);
89 this->txn_id_ = msg.get<boost::uuids::uuid>();
90 this->state_ = TxnState::ACTIVE;
91 }
catch (exception::iexception &) {
92 transaction_exists_.store(
false);
96 }
catch (exception::iexception &) {
97 transaction_exists_.store(
false);
102 boost::future<void> TransactionProxy::commit() {
104 if (state_ != TxnState::ACTIVE) {
105 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::commit()",
106 "Transaction is not active"));
108 state_ = TxnState::COMMITTING;
112 auto request = protocol::codec::transaction_commit_encode(txn_id_, thread_id_);
113 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
116 state_ = TxnState::COMMITTED;
117 }
catch (exception::iexception &) {
118 transaction_exists_.store(
false);
119 client_transaction_util::transaction_exception_factory()->rethrow(std::current_exception(),
120 "TransactionProxy::commit() failed");
124 state_ = TxnState::COMMIT_FAILED;
125 transaction_exists_.store(
false);
126 client_transaction_util::transaction_exception_factory()->rethrow(std::current_exception(),
127 "TransactionProxy::commit() failed");
128 return boost::make_ready_future();
132 boost::future<void> TransactionProxy::rollback() {
134 if (state_ == TxnState::NO_TXN || state_ == TxnState::ROLLED_BACK) {
135 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::rollback()",
136 "Transaction is not active"));
138 state_ = TxnState::ROLLING_BACK;
141 auto request = protocol::codec::transaction_rollback_encode(txn_id_, thread_id_);
142 return invoke(request).then(boost::launch::sync,
143 [=](boost::future<protocol::ClientMessage> f) {
145 state_ = TxnState::ROLLED_BACK;
146 transaction_exists_.store(
false);
148 }
catch (exception::iexception &e) {
149 HZ_LOG(client_context_.get_logger(), warning,
150 boost::str(boost::format(
151 "Exception while rolling back the transaction. "
157 }
catch (exception::iexception &exception) {
158 HZ_LOG(client_context_.get_logger(), warning,
159 boost::str(boost::format(
"Exception while rolling back the transaction. "
164 state_ = TxnState::ROLLED_BACK;
165 transaction_exists_.store(
false);
166 }
catch (exception::iexception &) {
167 transaction_exists_.store(
false);
168 client_transaction_util::transaction_exception_factory()->rethrow(std::current_exception(),
169 "TransactionProxy::rollback() failed");
171 return boost::make_ready_future();
174 serialization::pimpl::SerializationService &TransactionProxy::get_serialization_service() {
175 return client_context_.get_serialization_service();
178 std::shared_ptr<connection::Connection> TransactionProxy::get_connection() {
182 void TransactionProxy::check_thread() {
183 if (thread_id_ != util::get_current_thread_id()) {
184 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::checkThread()",
185 "Transaction cannot span multiple threads!"));
189 void TransactionProxy::check_timeout() {
190 if (start_time_ + options_.get_timeout() < std::chrono::steady_clock::now()) {
191 BOOST_THROW_EXCEPTION(exception::transaction(
"TransactionProxy::checkTimeout()",
192 "Transaction is timed-out!"));
196 TxnState::TxnState(state value)
201 values[2] = PREPARING;
202 values[3] = PREPARED;
203 values[4] = COMMITTING;
204 values[5] = COMMITTED;
205 values[6] = COMMIT_FAILED;
206 values[7] = ROLLING_BACK;
207 values[8] = ROLLED_BACK;
210 TxnState::operator int()
const {
214 void TxnState::operator=(
int i) {
218 boost::future<protocol::ClientMessage> TransactionProxy::invoke(protocol::ClientMessage &request) {
219 return client_transaction_util::invoke(request, boost::uuids::to_string(get_txn_id()), client_context_, connection_);
222 spi::ClientContext &TransactionProxy::get_client_context()
const {
223 return client_context_;
226 const std::shared_ptr<util::exception_util::runtime_exception_factory> client_transaction_util::exceptionFactory(
227 new class transaction_exception_factory());
229 boost::future<protocol::ClientMessage>
230 client_transaction_util::invoke(protocol::ClientMessage &request,
231 const std::string &object_name,
232 spi::ClientContext &client,
233 const std::shared_ptr<connection::Connection> &connection) {
235 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
236 client, request, object_name, connection);
237 return clientInvocation->invoke();
238 }
catch (exception::iexception &) {
239 transaction_exception_factory()->rethrow(std::current_exception(),
240 "ClientTransactionUtil::invoke failed");
241 return boost::make_ready_future(protocol::ClientMessage(0));
245 const std::shared_ptr<util::exception_util::runtime_exception_factory> &
246 client_transaction_util::transaction_exception_factory() {
247 return exceptionFactory;
251 client_transaction_util::transaction_exception_factory::rethrow(std::exception_ptr throwable,
252 const std::string &message) {
254 std::rethrow_exception(throwable);
256 std::throw_with_nested(
257 boost::enable_current_exception(
258 exception::transaction(
"transaction_exceptionFactory::create", message)));
264 TransactionalMapImpl::TransactionalMapImpl(
const std::string &name, txn::TransactionProxy &transaction_proxy)
265 : TransactionalObject(imap::SERVICE_NAME, name, transaction_proxy) {}
267 boost::future<bool> TransactionalMapImpl::contains_key_data(
const serialization::pimpl::data &key) {
268 auto request = protocol::codec::transactionalmap_containskey_encode(
269 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
271 return invoke_and_get_future<bool>(request);
274 boost::future<boost::optional<serialization::pimpl::data>>
275 TransactionalMapImpl::get_data(
const serialization::pimpl::data &key) {
276 auto request = protocol::codec::transactionalmap_get_encode(
277 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
279 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
283 boost::future<int> TransactionalMapImpl::size() {
284 auto request = protocol::codec::transactionalmap_size_encode(
285 get_name(), get_transaction_id(), util::get_current_thread_id());
287 return invoke_and_get_future<int>(request);
290 boost::future<bool> TransactionalMapImpl::is_empty() {
291 auto request = protocol::codec::transactionalmap_isempty_encode(
292 get_name(), get_transaction_id(), util::get_current_thread_id());
294 return invoke_and_get_future<bool>(
298 boost::future<boost::optional<serialization::pimpl::data>> TransactionalMapImpl::put_data(
299 const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
301 auto request = protocol::codec::transactionalmap_put_encode(
302 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value,
303 std::chrono::duration_cast<std::chrono::milliseconds>(get_timeout()).count());
305 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
310 TransactionalMapImpl::set_data(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
311 auto request = protocol::codec::transactionalmap_set_encode(
312 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
314 return to_void_future(invoke(request));
317 boost::future<boost::optional<serialization::pimpl::data>>
318 TransactionalMapImpl::put_if_absent_data(
const serialization::pimpl::data &key,
319 const serialization::pimpl::data &value) {
320 auto request = protocol::codec::transactionalmap_putifabsent_encode(
321 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
323 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
327 boost::future<boost::optional<serialization::pimpl::data>>
328 TransactionalMapImpl::replace_data(
const serialization::pimpl::data &key,
329 const serialization::pimpl::data &value) {
330 auto request = protocol::codec::transactionalmap_replace_encode(
331 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
333 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
337 boost::future<bool> TransactionalMapImpl::replace_data(
const serialization::pimpl::data &key,
338 const serialization::pimpl::data &old_value,
339 const serialization::pimpl::data &new_value) {
340 auto request = protocol::codec::transactionalmap_replaceifsame_encode(
341 get_name(), get_transaction_id(), util::get_current_thread_id(), key, old_value, new_value);
343 return invoke_and_get_future<bool>(
347 boost::future<boost::optional<serialization::pimpl::data>>
348 TransactionalMapImpl::remove_data(
const serialization::pimpl::data &key) {
349 auto request = protocol::codec::transactionalmap_remove_encode(
350 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
352 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
356 boost::future<void> TransactionalMapImpl::delete_entry_data(
const serialization::pimpl::data &key) {
357 auto request = protocol::codec::transactionalmap_delete_encode(
358 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
360 return to_void_future(invoke(request));
363 boost::future<bool> TransactionalMapImpl::remove_data(
const serialization::pimpl::data &key,
364 const serialization::pimpl::data &value) {
365 auto request = protocol::codec::transactionalmap_removeifsame_encode(
366 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
368 return invoke_and_get_future<bool>(
372 boost::future<std::vector<serialization::pimpl::data>> TransactionalMapImpl::key_set_data() {
373 auto request = protocol::codec::transactionalmap_keyset_encode(
374 get_name(), get_transaction_id(), util::get_current_thread_id());
376 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
380 boost::future<std::vector<serialization::pimpl::data>>
381 TransactionalMapImpl::key_set_data(
const serialization::pimpl::data &predicate) {
382 auto request = protocol::codec::transactionalmap_keysetwithpredicate_encode(
383 get_name(), get_transaction_id(), util::get_current_thread_id(), predicate);
385 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
389 boost::future<std::vector<serialization::pimpl::data>> TransactionalMapImpl::values_data() {
390 auto request = protocol::codec::transactionalmap_values_encode(
391 get_name(), get_transaction_id(), util::get_current_thread_id());
393 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
397 boost::future<std::vector<serialization::pimpl::data>>
398 TransactionalMapImpl::values_data(
const serialization::pimpl::data &predicate) {
399 auto request = protocol::codec::transactionalmap_valueswithpredicate_encode(
400 get_name(), get_transaction_id(), util::get_current_thread_id(), predicate);
402 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
406 TransactionalMultiMapImpl::TransactionalMultiMapImpl(
const std::string &name,
407 txn::TransactionProxy &transaction_proxy)
408 : TransactionalObject(multi_map::SERVICE_NAME, name, transaction_proxy) {}
410 boost::future<bool> TransactionalMultiMapImpl::put_data(
const serialization::pimpl::data &key,
411 const serialization::pimpl::data &value) {
412 auto request = protocol::codec::transactionalmultimap_put_encode(
413 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
415 return invoke_and_get_future<bool>(
419 boost::future<std::vector<serialization::pimpl::data>> TransactionalMultiMapImpl::get_data(
420 const serialization::pimpl::data &key) {
421 auto request = protocol::codec::transactionalmultimap_get_encode(
422 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
424 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
428 boost::future<bool> TransactionalMultiMapImpl::remove(
const serialization::pimpl::data &key,
429 const serialization::pimpl::data &value) {
430 auto request = protocol::codec::transactionalmultimap_removeentry_encode(
431 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
433 return invoke_and_get_future<bool>(
437 boost::future<std::vector<serialization::pimpl::data>> TransactionalMultiMapImpl::remove_data(
438 const serialization::pimpl::data &key) {
439 auto request = protocol::codec::transactionalmultimap_remove_encode(
440 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
442 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
446 boost::future<int> TransactionalMultiMapImpl::value_count(
const serialization::pimpl::data &key) {
447 auto request = protocol::codec::transactionalmultimap_valuecount_encode(
448 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
450 return invoke_and_get_future<int>(
454 boost::future<int> TransactionalMultiMapImpl::size() {
455 auto request = protocol::codec::transactionalmultimap_size_encode(
456 get_name(), get_transaction_id(), util::get_current_thread_id());
458 return invoke_and_get_future<int>(
462 TransactionalListImpl::TransactionalListImpl(
const std::string &object_name, txn::TransactionProxy &context)
463 : TransactionalObject(ilist::SERVICE_NAME, object_name, context) {}
465 boost::future<bool> TransactionalListImpl::add(
const serialization::pimpl::data &e) {
466 auto request = protocol::codec::transactionallist_add_encode(
467 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
469 return invoke_and_get_future<bool>(
473 boost::future<bool> TransactionalListImpl::remove(
const serialization::pimpl::data &e) {
474 auto request = protocol::codec::transactionallist_remove_encode(
475 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
477 return invoke_and_get_future<bool>(
481 boost::future<int> TransactionalListImpl::size() {
482 auto request = protocol::codec::transactionallist_size_encode(
483 get_name(), get_transaction_id(), util::get_current_thread_id());
485 return invoke_and_get_future<int>(
489 TransactionalSetImpl::TransactionalSetImpl(
const std::string &name, txn::TransactionProxy &transaction_proxy)
490 : TransactionalObject(iset::SERVICE_NAME, name, transaction_proxy) {}
492 boost::future<bool> TransactionalSetImpl::add_data(
const serialization::pimpl::data &e) {
493 auto request = protocol::codec::transactionalset_add_encode(
494 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
496 return invoke_and_get_future<bool>(request);
499 boost::future<bool> TransactionalSetImpl::remove_data(
const serialization::pimpl::data &e) {
500 auto request = protocol::codec::transactionalset_remove_encode(
501 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
503 return invoke_and_get_future<bool>(
507 boost::future<int> TransactionalSetImpl::size() {
508 auto request = protocol::codec::transactionalset_size_encode(
509 get_name(), get_transaction_id(), util::get_current_thread_id());
511 return invoke_and_get_future<int>(request);
514 TransactionalObject::TransactionalObject(
const std::string &service_name,
const std::string &object_name,
515 txn::TransactionProxy &context)
516 : proxy::SerializingProxy(context.get_client_context(), object_name), service_name_(service_name),
517 name_(object_name), context_(context) {}
519 TransactionalObject::~TransactionalObject() =
default;
521 const std::string &TransactionalObject::get_service_name() {
522 return service_name_;
525 const std::string &TransactionalObject::get_name() {
529 boost::future<void> TransactionalObject::destroy() {
531 auto request = protocol::codec::client_destroyproxy_encode(name_, service_name_);
532 return to_void_future(invoke_on_connection(request, context_.get_connection()));
535 void TransactionalObject::on_destroy() {}
537 boost::uuids::uuid TransactionalObject::get_transaction_id()
const {
538 return context_.get_txn_id();
541 std::chrono::milliseconds TransactionalObject::get_timeout()
const {
542 return context_.get_timeout();
546 transaction_context::transaction_context(spi::impl::ClientTransactionManagerServiceImpl &transaction_manager,
549 transaction_manager.connect()),
550 transaction_(options_,
551 transaction_manager.get_client(),
556 return transaction_.get_txn_id();
560 return transaction_.begin();
564 return transaction_.commit();
568 return transaction_.rollback();
575 return transaction_type_;
579 transaction_type_ = type;
588 if (duration.count() <= 0) {
589 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionOptions::setTimeout",
590 "Timeout must be positive!"));
601 if (num_machines < 0) {
602 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionOptions::setDurability",
603 "Durability cannot be negative!"));
605 this->durability_ = num_machines;
612 std::size_t hash<std::pair<std::string, std::string>>::operator()(
613 const std::pair<std::string, std::string> &val)
const noexcept {
614 return std::hash<std::string>{}(val.first + val.second);
boost::future< void > begin_transaction()
Begins a transaction.
boost::future< void > commit_transaction()
Commits a transaction.
boost::future< void > rollback_transaction()
Begins a transaction.
boost::uuids::uuid get_txn_id() const
Contains the configuration for a Hazelcast transaction.
transaction_options & set_timeout(std::chrono::milliseconds duration)
The timeout determines the maximum lifespan of a transaction.
transaction_options & set_transaction_type(transaction_type transaction_type)
Sets the TransactionType.
transaction_type get_transaction_type() const
std::chrono::milliseconds get_timeout() const
transaction_options & set_durability(int num_machines)
Sets the transaction durability.
transaction_type
Transaction type.
transaction_options()
Creates a new default configured TransactionsOptions.
int get_durability() const