32 #include <boost/uuid/uuid_io.hpp>
34 #include <hazelcast/client/txn/client_transaction_util.h>
35 #include "hazelcast/client/txn/TransactionProxy.h"
36 #include "hazelcast/client/transaction_options.h"
37 #include "hazelcast/util/Util.h"
38 #include "hazelcast/client/spi/impl/ClientInvocation.h"
39 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
40 #include "hazelcast/client/proxy/TransactionalMapImpl.h"
41 #include "hazelcast/client/proxy/TransactionalMultiMapImpl.h"
42 #include "hazelcast/client/proxy/TransactionalListImpl.h"
43 #include "hazelcast/client/proxy/TransactionalQueueImpl.h"
44 #include "hazelcast/client/proxy/TransactionalSetImpl.h"
45 #include "hazelcast/client/imap.h"
46 #include "hazelcast/client/multi_map.h"
47 #include "hazelcast/client/ilist.h"
48 #include "hazelcast/client/iqueue.h"
49 #include "hazelcast/client/iset.h"
50 #include "hazelcast/client/transaction_context.h"
51 #include "hazelcast/client/spi/impl/ClientTransactionManagerServiceImpl.h"
52 #include "hazelcast/client/protocol/codec/codecs.h"
57 TransactionProxy::TransactionProxy(transaction_options &txn_options, spi::ClientContext &client_context,
58 std::shared_ptr<connection::Connection> connection)
59 : options_(txn_options), client_context_(client_context), connection_(connection),
60 thread_id_(util::get_current_thread_id()), state_(TxnState::NO_TXN) {}
62 TransactionProxy::TransactionProxy(
const TransactionProxy &rhs) : options_(rhs.options_),
63 client_context_(rhs.client_context_),
64 connection_(rhs.connection_),
65 thread_id_(rhs.thread_id_), txn_id_(rhs.txn_id_),
67 start_time_(rhs.start_time_) {
68 transaction_exists_.store(rhs.transaction_exists_.load());
71 boost::uuids::uuid TransactionProxy::get_txn_id()
const {
75 TxnState TransactionProxy::get_state()
const {
79 std::chrono::milliseconds TransactionProxy::get_timeout()
const {
80 return options_.get_timeout();
83 boost::future<void> TransactionProxy::begin() {
85 if (state_ == TxnState::ACTIVE) {
86 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::begin()",
87 "Transaction is already active"));
90 if (transaction_exists_) {
91 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::begin()",
92 "Nested transactions are not allowed!"));
94 transaction_exists_.store(
true);
95 start_time_ = std::chrono::steady_clock::now();
96 auto request = protocol::codec::transaction_create_encode(
97 std::chrono::duration_cast<std::chrono::milliseconds>(get_timeout()).count(), options_.get_durability(),
98 static_cast<int32_t
>(options_.get_transaction_type()), thread_id_);
99 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
103 msg.rd_ptr(msg.RESPONSE_HEADER_LEN);
104 this->txn_id_ = msg.get<boost::uuids::uuid>();
105 this->state_ = TxnState::ACTIVE;
106 }
catch (exception::iexception &) {
107 transaction_exists_.store(
false);
111 }
catch (exception::iexception &) {
112 transaction_exists_.store(
false);
117 boost::future<void> TransactionProxy::commit() {
119 if (state_ != TxnState::ACTIVE) {
120 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::commit()",
121 "Transaction is not active"));
123 state_ = TxnState::COMMITTING;
127 auto request = protocol::codec::transaction_commit_encode(txn_id_, thread_id_);
128 return invoke(request).then(boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
131 state_ = TxnState::COMMITTED;
132 }
catch (exception::iexception &) {
133 transaction_exists_.store(
false);
134 client_transaction_util::transaction_exception_factory()->rethrow(std::current_exception(),
135 "TransactionProxy::commit() failed");
139 state_ = TxnState::COMMIT_FAILED;
140 transaction_exists_.store(
false);
141 client_transaction_util::transaction_exception_factory()->rethrow(std::current_exception(),
142 "TransactionProxy::commit() failed");
143 return boost::make_ready_future();
147 boost::future<void> TransactionProxy::rollback() {
149 if (state_ == TxnState::NO_TXN || state_ == TxnState::ROLLED_BACK) {
150 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::rollback()",
151 "Transaction is not active"));
153 state_ = TxnState::ROLLING_BACK;
156 auto request = protocol::codec::transaction_rollback_encode(txn_id_, thread_id_);
157 return invoke(request).then(boost::launch::sync,
158 [=](boost::future<protocol::ClientMessage> f) {
160 state_ = TxnState::ROLLED_BACK;
161 transaction_exists_.store(
false);
163 }
catch (exception::iexception &e) {
164 HZ_LOG(client_context_.get_logger(), warning,
165 boost::str(boost::format(
166 "Exception while rolling back the transaction. "
172 }
catch (exception::iexception &exception) {
173 HZ_LOG(client_context_.get_logger(), warning,
174 boost::str(boost::format(
"Exception while rolling back the transaction. "
179 state_ = TxnState::ROLLED_BACK;
180 transaction_exists_.store(
false);
181 }
catch (exception::iexception &) {
182 transaction_exists_.store(
false);
183 client_transaction_util::transaction_exception_factory()->rethrow(std::current_exception(),
184 "TransactionProxy::rollback() failed");
186 return boost::make_ready_future();
189 serialization::pimpl::SerializationService &TransactionProxy::get_serialization_service() {
190 return client_context_.get_serialization_service();
193 std::shared_ptr<connection::Connection> TransactionProxy::get_connection() {
197 void TransactionProxy::check_thread() {
198 if (thread_id_ != util::get_current_thread_id()) {
199 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionProxy::checkThread()",
200 "Transaction cannot span multiple threads!"));
204 void TransactionProxy::check_timeout() {
205 if (start_time_ + options_.get_timeout() < std::chrono::steady_clock::now()) {
206 BOOST_THROW_EXCEPTION(exception::transaction(
"TransactionProxy::checkTimeout()",
207 "Transaction is timed-out!"));
211 TxnState::TxnState(state value)
216 values[2] = PREPARING;
217 values[3] = PREPARED;
218 values[4] = COMMITTING;
219 values[5] = COMMITTED;
220 values[6] = COMMIT_FAILED;
221 values[7] = ROLLING_BACK;
222 values[8] = ROLLED_BACK;
225 TxnState::operator int()
const {
229 void TxnState::operator=(
int i) {
233 boost::future<protocol::ClientMessage> TransactionProxy::invoke(protocol::ClientMessage &request) {
234 return client_transaction_util::invoke(request, boost::uuids::to_string(get_txn_id()), client_context_, connection_);
237 spi::ClientContext &TransactionProxy::get_client_context()
const {
238 return client_context_;
241 const std::shared_ptr<util::exception_util::runtime_exception_factory> client_transaction_util::exceptionFactory(
242 new class transaction_exception_factory());
244 boost::future<protocol::ClientMessage>
245 client_transaction_util::invoke(protocol::ClientMessage &request,
246 const std::string &object_name,
247 spi::ClientContext &client,
248 const std::shared_ptr<connection::Connection> &connection) {
250 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation = spi::impl::ClientInvocation::create(
251 client, request, object_name, connection);
252 return clientInvocation->invoke();
253 }
catch (exception::iexception &) {
254 transaction_exception_factory()->rethrow(std::current_exception(),
255 "ClientTransactionUtil::invoke failed");
256 return boost::make_ready_future(protocol::ClientMessage(0));
260 const std::shared_ptr<util::exception_util::runtime_exception_factory> &
261 client_transaction_util::transaction_exception_factory() {
262 return exceptionFactory;
266 client_transaction_util::transaction_exception_factory::rethrow(std::exception_ptr throwable,
267 const std::string &message) {
269 std::rethrow_exception(throwable);
271 std::throw_with_nested(
272 boost::enable_current_exception(
273 exception::transaction(
"transaction_exceptionFactory::create", message)));
279 TransactionalMapImpl::TransactionalMapImpl(
const std::string &name, txn::TransactionProxy &transaction_proxy)
280 : TransactionalObject(imap::SERVICE_NAME, name, transaction_proxy) {}
282 boost::future<bool> TransactionalMapImpl::contains_key_data(
const serialization::pimpl::data &key) {
283 auto request = protocol::codec::transactionalmap_containskey_encode(
284 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
286 return invoke_and_get_future<bool>(request);
289 boost::future<boost::optional<serialization::pimpl::data>>
290 TransactionalMapImpl::get_data(
const serialization::pimpl::data &key) {
291 auto request = protocol::codec::transactionalmap_get_encode(
292 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
294 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
298 boost::future<int> TransactionalMapImpl::size() {
299 auto request = protocol::codec::transactionalmap_size_encode(
300 get_name(), get_transaction_id(), util::get_current_thread_id());
302 return invoke_and_get_future<int>(request);
305 boost::future<bool> TransactionalMapImpl::is_empty() {
306 auto request = protocol::codec::transactionalmap_isempty_encode(
307 get_name(), get_transaction_id(), util::get_current_thread_id());
309 return invoke_and_get_future<bool>(
313 boost::future<boost::optional<serialization::pimpl::data>> TransactionalMapImpl::put_data(
314 const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
316 auto request = protocol::codec::transactionalmap_put_encode(
317 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value,
318 std::chrono::duration_cast<std::chrono::milliseconds>(get_timeout()).count());
320 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
325 TransactionalMapImpl::set_data(
const serialization::pimpl::data &key,
const serialization::pimpl::data &value) {
326 auto request = protocol::codec::transactionalmap_set_encode(
327 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
329 return to_void_future(invoke(request));
332 boost::future<boost::optional<serialization::pimpl::data>>
333 TransactionalMapImpl::put_if_absent_data(
const serialization::pimpl::data &key,
334 const serialization::pimpl::data &value) {
335 auto request = protocol::codec::transactionalmap_putifabsent_encode(
336 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
338 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
342 boost::future<boost::optional<serialization::pimpl::data>>
343 TransactionalMapImpl::replace_data(
const serialization::pimpl::data &key,
344 const serialization::pimpl::data &value) {
345 auto request = protocol::codec::transactionalmap_replace_encode(
346 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
348 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
352 boost::future<bool> TransactionalMapImpl::replace_data(
const serialization::pimpl::data &key,
353 const serialization::pimpl::data &old_value,
354 const serialization::pimpl::data &new_value) {
355 auto request = protocol::codec::transactionalmap_replaceifsame_encode(
356 get_name(), get_transaction_id(), util::get_current_thread_id(), key, old_value, new_value);
358 return invoke_and_get_future<bool>(
362 boost::future<boost::optional<serialization::pimpl::data>>
363 TransactionalMapImpl::remove_data(
const serialization::pimpl::data &key) {
364 auto request = protocol::codec::transactionalmap_remove_encode(
365 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
367 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
371 boost::future<void> TransactionalMapImpl::delete_entry_data(
const serialization::pimpl::data &key) {
372 auto request = protocol::codec::transactionalmap_delete_encode(
373 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
375 return to_void_future(invoke(request));
378 boost::future<bool> TransactionalMapImpl::remove_data(
const serialization::pimpl::data &key,
379 const serialization::pimpl::data &value) {
380 auto request = protocol::codec::transactionalmap_removeifsame_encode(
381 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
383 return invoke_and_get_future<bool>(
387 boost::future<std::vector<serialization::pimpl::data>> TransactionalMapImpl::key_set_data() {
388 auto request = protocol::codec::transactionalmap_keyset_encode(
389 get_name(), get_transaction_id(), util::get_current_thread_id());
391 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
395 boost::future<std::vector<serialization::pimpl::data>>
396 TransactionalMapImpl::key_set_data(
const serialization::pimpl::data &predicate) {
397 auto request = protocol::codec::transactionalmap_keysetwithpredicate_encode(
398 get_name(), get_transaction_id(), util::get_current_thread_id(), predicate);
400 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
404 boost::future<std::vector<serialization::pimpl::data>> TransactionalMapImpl::values_data() {
405 auto request = protocol::codec::transactionalmap_values_encode(
406 get_name(), get_transaction_id(), util::get_current_thread_id());
408 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
412 boost::future<std::vector<serialization::pimpl::data>>
413 TransactionalMapImpl::values_data(
const serialization::pimpl::data &predicate) {
414 auto request = protocol::codec::transactionalmap_valueswithpredicate_encode(
415 get_name(), get_transaction_id(), util::get_current_thread_id(), predicate);
417 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
421 TransactionalMultiMapImpl::TransactionalMultiMapImpl(
const std::string &name,
422 txn::TransactionProxy &transaction_proxy)
423 : TransactionalObject(multi_map::SERVICE_NAME, name, transaction_proxy) {}
425 boost::future<bool> TransactionalMultiMapImpl::put_data(
const serialization::pimpl::data &key,
426 const serialization::pimpl::data &value) {
427 auto request = protocol::codec::transactionalmultimap_put_encode(
428 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
430 return invoke_and_get_future<bool>(
434 boost::future<std::vector<serialization::pimpl::data>> TransactionalMultiMapImpl::get_data(
435 const serialization::pimpl::data &key) {
436 auto request = protocol::codec::transactionalmultimap_get_encode(
437 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
439 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
443 boost::future<bool> TransactionalMultiMapImpl::remove(
const serialization::pimpl::data &key,
444 const serialization::pimpl::data &value) {
445 auto request = protocol::codec::transactionalmultimap_removeentry_encode(
446 get_name(), get_transaction_id(), util::get_current_thread_id(), key, value);
448 return invoke_and_get_future<bool>(
452 boost::future<std::vector<serialization::pimpl::data>> TransactionalMultiMapImpl::remove_data(
453 const serialization::pimpl::data &key) {
454 auto request = protocol::codec::transactionalmultimap_remove_encode(
455 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
457 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
461 boost::future<int> TransactionalMultiMapImpl::value_count(
const serialization::pimpl::data &key) {
462 auto request = protocol::codec::transactionalmultimap_valuecount_encode(
463 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
465 return invoke_and_get_future<int>(
469 boost::future<int> TransactionalMultiMapImpl::size() {
470 auto request = protocol::codec::transactionalmultimap_size_encode(
471 get_name(), get_transaction_id(), util::get_current_thread_id());
473 return invoke_and_get_future<int>(
477 TransactionalListImpl::TransactionalListImpl(
const std::string &object_name, txn::TransactionProxy &context)
478 : TransactionalObject(ilist::SERVICE_NAME, object_name, context) {}
480 boost::future<bool> TransactionalListImpl::add(
const serialization::pimpl::data &e) {
481 auto request = protocol::codec::transactionallist_add_encode(
482 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
484 return invoke_and_get_future<bool>(
488 boost::future<bool> TransactionalListImpl::remove(
const serialization::pimpl::data &e) {
489 auto request = protocol::codec::transactionallist_remove_encode(
490 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
492 return invoke_and_get_future<bool>(
496 boost::future<int> TransactionalListImpl::size() {
497 auto request = protocol::codec::transactionallist_size_encode(
498 get_name(), get_transaction_id(), util::get_current_thread_id());
500 return invoke_and_get_future<int>(
504 TransactionalSetImpl::TransactionalSetImpl(
const std::string &name, txn::TransactionProxy &transaction_proxy)
505 : TransactionalObject(iset::SERVICE_NAME, name, transaction_proxy) {}
507 boost::future<bool> TransactionalSetImpl::add_data(
const serialization::pimpl::data &e) {
508 auto request = protocol::codec::transactionalset_add_encode(
509 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
511 return invoke_and_get_future<bool>(request);
514 boost::future<bool> TransactionalSetImpl::remove_data(
const serialization::pimpl::data &e) {
515 auto request = protocol::codec::transactionalset_remove_encode(
516 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
518 return invoke_and_get_future<bool>(
522 boost::future<int> TransactionalSetImpl::size() {
523 auto request = protocol::codec::transactionalset_size_encode(
524 get_name(), get_transaction_id(), util::get_current_thread_id());
526 return invoke_and_get_future<int>(request);
529 TransactionalObject::TransactionalObject(
const std::string &service_name,
const std::string &object_name,
530 txn::TransactionProxy &context)
531 : proxy::SerializingProxy(context.get_client_context(), object_name), service_name_(service_name),
532 name_(object_name), context_(context) {}
534 TransactionalObject::~TransactionalObject() =
default;
536 const std::string &TransactionalObject::get_service_name() {
537 return service_name_;
540 const std::string &TransactionalObject::get_name() {
544 boost::future<void> TransactionalObject::destroy() {
546 auto request = protocol::codec::client_destroyproxy_encode(name_, service_name_);
547 return to_void_future(invoke_on_connection(request, context_.get_connection()));
550 void TransactionalObject::on_destroy() {}
552 boost::uuids::uuid TransactionalObject::get_transaction_id()
const {
553 return context_.get_txn_id();
556 std::chrono::milliseconds TransactionalObject::get_timeout()
const {
557 return context_.get_timeout();
561 transaction_context::transaction_context(spi::impl::ClientTransactionManagerServiceImpl &transaction_manager,
564 transaction_manager.connect()),
565 transaction_(options_,
566 transaction_manager.get_client(),
571 return transaction_.get_txn_id();
575 return transaction_.begin();
579 return transaction_.commit();
583 return transaction_.rollback();
590 return transaction_type_;
594 transaction_type_ = type;
603 if (duration.count() <= 0) {
604 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionOptions::setTimeout",
605 "Timeout must be positive!"));
616 if (num_machines < 0) {
617 BOOST_THROW_EXCEPTION(exception::illegal_state(
"TransactionOptions::setDurability",
618 "Durability cannot be negative!"));
620 this->durability_ = num_machines;
627 std::size_t hash<std::pair<std::string, std::string>>::operator()(
628 const std::pair<std::string, std::string> &val)
const noexcept {
629 return std::hash<std::string>{}(val.first + val.second);
boost::future< void > begin_transaction()
Begins a transaction.
boost::future< void > commit_transaction()
Commits a transaction.
boost::future< void > rollback_transaction()
Begins a transaction.
boost::uuids::uuid get_txn_id() const
Contains the configuration for a Hazelcast transaction.
transaction_options & set_timeout(std::chrono::milliseconds duration)
The timeout determines the maximum lifespan of a transaction.
transaction_options & set_transaction_type(transaction_type transaction_type)
Sets the TransactionType.
transaction_type get_transaction_type() const
std::chrono::milliseconds get_timeout() const
transaction_options & set_durability(int num_machines)
Sets the transaction durability.
transaction_type
Transaction type.
transaction_options()
Creates a new default configured TransactionsOptions.
int get_durability() const