17 #include <boost/uuid/uuid_io.hpp> 
   19 #include <hazelcast/client/txn/client_transaction_util.h> 
   20 #include "hazelcast/client/txn/TransactionProxy.h" 
   21 #include "hazelcast/client/transaction_options.h" 
   22 #include "hazelcast/util/Util.h" 
   23 #include "hazelcast/client/spi/impl/ClientInvocation.h" 
   24 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h" 
   25 #include "hazelcast/client/proxy/TransactionalMapImpl.h" 
   26 #include "hazelcast/client/proxy/TransactionalMultiMapImpl.h" 
   27 #include "hazelcast/client/proxy/TransactionalListImpl.h" 
   28 #include "hazelcast/client/proxy/TransactionalQueueImpl.h" 
   29 #include "hazelcast/client/proxy/TransactionalSetImpl.h" 
   30 #include "hazelcast/client/imap.h" 
   31 #include "hazelcast/client/multi_map.h" 
   32 #include "hazelcast/client/ilist.h" 
   33 #include "hazelcast/client/iqueue.h" 
   34 #include "hazelcast/client/iset.h" 
   35 #include "hazelcast/client/transaction_context.h" 
   36 #include "hazelcast/client/spi/impl/ClientTransactionManagerServiceImpl.h" 
   37 #include "hazelcast/client/protocol/codec/codecs.h" 
   42 TransactionProxy::TransactionProxy(
 
   43   transaction_options& txn_options,
 
   44   spi::ClientContext& client_context,
 
   45   std::shared_ptr<connection::Connection> connection)
 
   46   : options_(txn_options)
 
   47   , client_context_(client_context)
 
   48   , connection_(connection)
 
   49   , thread_id_(util::get_current_thread_id())
 
   50   , state_(TxnState::NO_TXN)
 
   53 TransactionProxy::TransactionProxy(
const TransactionProxy& rhs)
 
   54   : options_(rhs.options_)
 
   55   , client_context_(rhs.client_context_)
 
   56   , connection_(rhs.connection_)
 
   57   , thread_id_(rhs.thread_id_)
 
   58   , txn_id_(rhs.txn_id_)
 
   60   , start_time_(rhs.start_time_)
 
   62     transaction_exists_.store(rhs.transaction_exists_.load());
 
   66 TransactionProxy::get_txn_id()
 const 
   72 TransactionProxy::get_state()
 const 
   77 std::chrono::milliseconds
 
   78 TransactionProxy::get_timeout()
 const 
   80     return options_.get_timeout();
 
   84 TransactionProxy::begin()
 
   87         if (state_ == TxnState::ACTIVE) {
 
   88             BOOST_THROW_EXCEPTION(exception::illegal_state(
 
   89               "TransactionProxy::begin()", 
"Transaction is already active"));
 
   92         if (transaction_exists_) {
 
   93             BOOST_THROW_EXCEPTION(
 
   94               exception::illegal_state(
"TransactionProxy::begin()",
 
   95                                        "Nested transactions are not allowed!"));
 
   97         transaction_exists_.store(
true);
 
   98         start_time_ = std::chrono::steady_clock::now();
 
   99         auto request = protocol::codec::transaction_create_encode(
 
  100           std::chrono::duration_cast<std::chrono::milliseconds>(get_timeout())
 
  102           options_.get_durability(),
 
  103           static_cast<int32_t
>(options_.get_transaction_type()),
 
  105         return invoke(request).then(
 
  106           boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
 
  110                   msg.rd_ptr(msg.RESPONSE_HEADER_LEN);
 
  111                   this->txn_id_ = msg.get<boost::uuids::uuid>();
 
  112                   this->state_ = TxnState::ACTIVE;
 
  113               } 
catch (exception::iexception&) {
 
  114                   transaction_exists_.store(
false);
 
  118     } 
catch (exception::iexception&) {
 
  119         transaction_exists_.store(
false);
 
  125 TransactionProxy::commit()
 
  128         if (state_ != TxnState::ACTIVE) {
 
  129             BOOST_THROW_EXCEPTION(exception::illegal_state(
 
  130               "TransactionProxy::commit()", 
"Transaction is not active"));
 
  132         state_ = TxnState::COMMITTING;
 
  137           protocol::codec::transaction_commit_encode(txn_id_, thread_id_);
 
  138         return invoke(request).then(
 
  139           boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
 
  142                   state_ = TxnState::COMMITTED;
 
  143               } 
catch (exception::iexception&) {
 
  144                   transaction_exists_.store(
false);
 
  145                   client_transaction_util::transaction_exception_factory()
 
  146                     ->rethrow(std::current_exception(),
 
  147                               "TransactionProxy::commit() failed");
 
  151         state_ = TxnState::COMMIT_FAILED;
 
  152         transaction_exists_.store(
false);
 
  153         client_transaction_util::transaction_exception_factory()->rethrow(
 
  154           std::current_exception(), 
"TransactionProxy::commit() failed");
 
  155         return boost::make_ready_future();
 
  160 TransactionProxy::rollback()
 
  163         if (state_ == TxnState::NO_TXN || state_ == TxnState::ROLLED_BACK) {
 
  164             BOOST_THROW_EXCEPTION(exception::illegal_state(
 
  165               "TransactionProxy::rollback()", 
"Transaction is not active"));
 
  167         state_ = TxnState::ROLLING_BACK;
 
  171               protocol::codec::transaction_rollback_encode(txn_id_, thread_id_);
 
  172             return invoke(request).then(
 
  174               [=](boost::future<protocol::ClientMessage> f) {
 
  176                       state_ = TxnState::ROLLED_BACK;
 
  177                       transaction_exists_.store(
false);
 
  179                   } 
catch (exception::iexception& e) {
 
  181                         client_context_.get_logger(),
 
  185                             "Exception while rolling back the transaction. " 
  190         } 
catch (exception::iexception& exception) {
 
  191             HZ_LOG(client_context_.get_logger(),
 
  193                    boost::str(boost::format(
 
  194                                 "Exception while rolling back the transaction. " 
  198         state_ = TxnState::ROLLED_BACK;
 
  199         transaction_exists_.store(
false);
 
  200     } 
catch (exception::iexception&) {
 
  201         transaction_exists_.store(
false);
 
  202         client_transaction_util::transaction_exception_factory()->rethrow(
 
  203           std::current_exception(), 
"TransactionProxy::rollback() failed");
 
  205     return boost::make_ready_future();
 
  208 serialization::pimpl::SerializationService&
 
  209 TransactionProxy::get_serialization_service()
 
  211     return client_context_.get_serialization_service();
 
  214 std::shared_ptr<connection::Connection>
 
  215 TransactionProxy::get_connection()
 
  221 TransactionProxy::check_thread()
 
  223     if (thread_id_ != util::get_current_thread_id()) {
 
  224         BOOST_THROW_EXCEPTION(exception::illegal_state(
 
  225           "TransactionProxy::checkThread()",
 
  226           "Transaction cannot span multiple threads!"));
 
  231 TransactionProxy::check_timeout()
 
  233     if (start_time_ + options_.get_timeout() <
 
  234         std::chrono::steady_clock::now()) {
 
  235         BOOST_THROW_EXCEPTION(exception::transaction(
 
  236           "TransactionProxy::checkTimeout()", 
"Transaction is timed-out!"));
 
  240 TxnState::TxnState(state value)
 
  246     values[2] = PREPARING;
 
  247     values[3] = PREPARED;
 
  248     values[4] = COMMITTING;
 
  249     values[5] = COMMITTED;
 
  250     values[6] = COMMIT_FAILED;
 
  251     values[7] = ROLLING_BACK;
 
  252     values[8] = ROLLED_BACK;
 
  255 TxnState::operator int()
 const 
  261 TxnState::operator=(
int i)
 
  266 boost::future<protocol::ClientMessage>
 
  267 TransactionProxy::invoke(protocol::ClientMessage& request)
 
  269     return client_transaction_util::invoke(
 
  271       boost::uuids::to_string(get_txn_id()),
 
  277 TransactionProxy::get_client_context()
 const 
  279     return client_context_;
 
  282 const std::shared_ptr<util::exception_util::runtime_exception_factory>
 
  283   client_transaction_util::exceptionFactory(
 
  284     new class transaction_exception_factory());
 
  286 boost::future<protocol::ClientMessage>
 
  287 client_transaction_util::invoke(
 
  288   protocol::ClientMessage& request,
 
  289   const std::string& object_name,
 
  290   spi::ClientContext& client,
 
  291   const std::shared_ptr<connection::Connection>& connection)
 
  294         std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
 
  295           spi::impl::ClientInvocation::create(
 
  296             client, request, object_name, connection);
 
  297         return clientInvocation->invoke();
 
  298     } 
catch (exception::iexception&) {
 
  299         transaction_exception_factory()->rethrow(
 
  300           std::current_exception(), 
"ClientTransactionUtil::invoke failed");
 
  301         return boost::make_ready_future(protocol::ClientMessage(0));
 
  305 const std::shared_ptr<util::exception_util::runtime_exception_factory>&
 
  306 client_transaction_util::transaction_exception_factory()
 
  308     return exceptionFactory;
 
  312 client_transaction_util::transaction_exception_factory::rethrow(
 
  313   std::exception_ptr throwable,
 
  314   const std::string& message)
 
  317         std::rethrow_exception(throwable);
 
  319         std::throw_with_nested(
 
  320           boost::enable_current_exception(exception::transaction(
 
  321             "transaction_exceptionFactory::create", message)));
 
  327 TransactionalMapImpl::TransactionalMapImpl(
 
  328   const std::string& name,
 
  329   txn::TransactionProxy& transaction_proxy)
 
  330   : TransactionalObject(imap::SERVICE_NAME, name, transaction_proxy)
 
  334 TransactionalMapImpl::contains_key_data(
const serialization::pimpl::data& key)
 
  336     auto request = protocol::codec::transactionalmap_containskey_encode(
 
  337       get_name(), get_transaction_id(), util::get_current_thread_id(), key);
 
  339     return invoke_and_get_future<bool>(request);
 
  342 boost::future<boost::optional<serialization::pimpl::data>>
 
  343 TransactionalMapImpl::get_data(
const serialization::pimpl::data& key)
 
  345     auto request = protocol::codec::transactionalmap_get_encode(
 
  346       get_name(), get_transaction_id(), util::get_current_thread_id(), key);
 
  348     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
  353 TransactionalMapImpl::size()
 
  355     auto request = protocol::codec::transactionalmap_size_encode(
 
  356       get_name(), get_transaction_id(), util::get_current_thread_id());
 
  358     return invoke_and_get_future<int>(request);
 
  362 TransactionalMapImpl::is_empty()
 
  364     auto request = protocol::codec::transactionalmap_isempty_encode(
 
  365       get_name(), get_transaction_id(), util::get_current_thread_id());
 
  367     return invoke_and_get_future<bool>(request);
 
  370 boost::future<boost::optional<serialization::pimpl::data>>
 
  371 TransactionalMapImpl::put_data(
const serialization::pimpl::data& key,
 
  372                                const serialization::pimpl::data& value)
 
  375     auto request = protocol::codec::transactionalmap_put_encode(
 
  377       get_transaction_id(),
 
  378       util::get_current_thread_id(),
 
  381       std::chrono::duration_cast<std::chrono::milliseconds>(get_timeout())
 
  384     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
  389 TransactionalMapImpl::set_data(
const serialization::pimpl::data& key,
 
  390                                const serialization::pimpl::data& value)
 
  392     auto request = protocol::codec::transactionalmap_set_encode(
 
  394       get_transaction_id(),
 
  395       util::get_current_thread_id(),
 
  399     return to_void_future(invoke(request));
 
  402 boost::future<boost::optional<serialization::pimpl::data>>
 
  403 TransactionalMapImpl::put_if_absent_data(
 
  404   const serialization::pimpl::data& key,
 
  405   const serialization::pimpl::data& value)
 
  407     auto request = protocol::codec::transactionalmap_putifabsent_encode(
 
  409       get_transaction_id(),
 
  410       util::get_current_thread_id(),
 
  414     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
  418 boost::future<boost::optional<serialization::pimpl::data>>
 
  419 TransactionalMapImpl::replace_data(
const serialization::pimpl::data& key,
 
  420                                    const serialization::pimpl::data& value)
 
  422     auto request = protocol::codec::transactionalmap_replace_encode(
 
  424       get_transaction_id(),
 
  425       util::get_current_thread_id(),
 
  429     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
  434 TransactionalMapImpl::replace_data(
const serialization::pimpl::data& key,
 
  435                                    const serialization::pimpl::data& old_value,
 
  436                                    const serialization::pimpl::data& new_value)
 
  438     auto request = protocol::codec::transactionalmap_replaceifsame_encode(
 
  440       get_transaction_id(),
 
  441       util::get_current_thread_id(),
 
  446     return invoke_and_get_future<bool>(request);
 
  449 boost::future<boost::optional<serialization::pimpl::data>>
 
  450 TransactionalMapImpl::remove_data(
const serialization::pimpl::data& key)
 
  452     auto request = protocol::codec::transactionalmap_remove_encode(
 
  453       get_name(), get_transaction_id(), util::get_current_thread_id(), key);
 
  455     return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
 
  460 TransactionalMapImpl::delete_entry_data(
const serialization::pimpl::data& key)
 
  462     auto request = protocol::codec::transactionalmap_delete_encode(
 
  463       get_name(), get_transaction_id(), util::get_current_thread_id(), key);
 
  465     return to_void_future(invoke(request));
 
  469 TransactionalMapImpl::remove_data(
const serialization::pimpl::data& key,
 
  470                                   const serialization::pimpl::data& value)
 
  472     auto request = protocol::codec::transactionalmap_removeifsame_encode(
 
  474       get_transaction_id(),
 
  475       util::get_current_thread_id(),
 
  479     return invoke_and_get_future<bool>(request);
 
  482 boost::future<std::vector<serialization::pimpl::data>>
 
  483 TransactionalMapImpl::key_set_data()
 
  485     auto request = protocol::codec::transactionalmap_keyset_encode(
 
  486       get_name(), get_transaction_id(), util::get_current_thread_id());
 
  488     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  492 boost::future<std::vector<serialization::pimpl::data>>
 
  493 TransactionalMapImpl::key_set_data(
const serialization::pimpl::data& predicate)
 
  495     auto request = protocol::codec::transactionalmap_keysetwithpredicate_encode(
 
  497       get_transaction_id(),
 
  498       util::get_current_thread_id(),
 
  501     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  505 boost::future<std::vector<serialization::pimpl::data>>
 
  506 TransactionalMapImpl::values_data()
 
  508     auto request = protocol::codec::transactionalmap_values_encode(
 
  509       get_name(), get_transaction_id(), util::get_current_thread_id());
 
  511     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  515 boost::future<std::vector<serialization::pimpl::data>>
 
  516 TransactionalMapImpl::values_data(
const serialization::pimpl::data& predicate)
 
  518     auto request = protocol::codec::transactionalmap_valueswithpredicate_encode(
 
  520       get_transaction_id(),
 
  521       util::get_current_thread_id(),
 
  524     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  528 TransactionalMultiMapImpl::TransactionalMultiMapImpl(
 
  529   const std::string& name,
 
  530   txn::TransactionProxy& transaction_proxy)
 
  531   : TransactionalObject(multi_map::SERVICE_NAME, name, transaction_proxy)
 
  535 TransactionalMultiMapImpl::put_data(
const serialization::pimpl::data& key,
 
  536                                     const serialization::pimpl::data& value)
 
  538     auto request = protocol::codec::transactionalmultimap_put_encode(
 
  540       get_transaction_id(),
 
  541       util::get_current_thread_id(),
 
  545     return invoke_and_get_future<bool>(request);
 
  548 boost::future<std::vector<serialization::pimpl::data>>
 
  549 TransactionalMultiMapImpl::get_data(
const serialization::pimpl::data& key)
 
  551     auto request = protocol::codec::transactionalmultimap_get_encode(
 
  552       get_name(), get_transaction_id(), util::get_current_thread_id(), key);
 
  554     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  559 TransactionalMultiMapImpl::remove(
const serialization::pimpl::data& key,
 
  560                                   const serialization::pimpl::data& value)
 
  562     auto request = protocol::codec::transactionalmultimap_removeentry_encode(
 
  564       get_transaction_id(),
 
  565       util::get_current_thread_id(),
 
  569     return invoke_and_get_future<bool>(request);
 
  572 boost::future<std::vector<serialization::pimpl::data>>
 
  573 TransactionalMultiMapImpl::remove_data(
const serialization::pimpl::data& key)
 
  575     auto request = protocol::codec::transactionalmultimap_remove_encode(
 
  576       get_name(), get_transaction_id(), util::get_current_thread_id(), key);
 
  578     return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
 
  583 TransactionalMultiMapImpl::value_count(
const serialization::pimpl::data& key)
 
  585     auto request = protocol::codec::transactionalmultimap_valuecount_encode(
 
  586       get_name(), get_transaction_id(), util::get_current_thread_id(), key);
 
  588     return invoke_and_get_future<int>(request);
 
  592 TransactionalMultiMapImpl::size()
 
  594     auto request = protocol::codec::transactionalmultimap_size_encode(
 
  595       get_name(), get_transaction_id(), util::get_current_thread_id());
 
  597     return invoke_and_get_future<int>(request);
 
  600 TransactionalListImpl::TransactionalListImpl(
const std::string& object_name,
 
  601                                              txn::TransactionProxy& context)
 
  602   : TransactionalObject(ilist::SERVICE_NAME, object_name, context)
 
  606 TransactionalListImpl::add(
const serialization::pimpl::data& e)
 
  608     auto request = protocol::codec::transactionallist_add_encode(
 
  609       get_name(), get_transaction_id(), util::get_current_thread_id(), e);
 
  611     return invoke_and_get_future<bool>(request);
 
  615 TransactionalListImpl::remove(
const serialization::pimpl::data& e)
 
  617     auto request = protocol::codec::transactionallist_remove_encode(
 
  618       get_name(), get_transaction_id(), util::get_current_thread_id(), e);
 
  620     return invoke_and_get_future<bool>(request);
 
  624 TransactionalListImpl::size()
 
  626     auto request = protocol::codec::transactionallist_size_encode(
 
  627       get_name(), get_transaction_id(), util::get_current_thread_id());
 
  629     return invoke_and_get_future<int>(request);
 
  632 TransactionalSetImpl::TransactionalSetImpl(
 
  633   const std::string& name,
 
  634   txn::TransactionProxy& transaction_proxy)
 
  635   : TransactionalObject(iset::SERVICE_NAME, name, transaction_proxy)
 
  639 TransactionalSetImpl::add_data(
const serialization::pimpl::data& e)
 
  641     auto request = protocol::codec::transactionalset_add_encode(
 
  642       get_name(), get_transaction_id(), util::get_current_thread_id(), e);
 
  644     return invoke_and_get_future<bool>(request);
 
  648 TransactionalSetImpl::remove_data(
const serialization::pimpl::data& e)
 
  650     auto request = protocol::codec::transactionalset_remove_encode(
 
  651       get_name(), get_transaction_id(), util::get_current_thread_id(), e);
 
  653     return invoke_and_get_future<bool>(request);
 
  657 TransactionalSetImpl::size()
 
  659     auto request = protocol::codec::transactionalset_size_encode(
 
  660       get_name(), get_transaction_id(), util::get_current_thread_id());
 
  662     return invoke_and_get_future<int>(request);
 
  665 TransactionalObject::TransactionalObject(
const std::string& service_name,
 
  666                                          const std::string& object_name,
 
  667                                          txn::TransactionProxy& context)
 
  668   : proxy::SerializingProxy(context.get_client_context(), object_name)
 
  669   , service_name_(service_name)
 
  674 TransactionalObject::~TransactionalObject() = 
default;
 
  677 TransactionalObject::get_service_name()
 
  679     return service_name_;
 
  683 TransactionalObject::get_name()
 
  689 TransactionalObject::destroy()
 
  693       protocol::codec::client_destroyproxy_encode(name_, service_name_);
 
  694     return to_void_future(
 
  695       invoke_on_connection(request, context_.get_connection()));
 
  699 TransactionalObject::on_destroy()
 
  703 TransactionalObject::get_transaction_id()
 const 
  705     return context_.get_txn_id();
 
  708 std::chrono::milliseconds
 
  709 TransactionalObject::get_timeout()
 const 
  711     return context_.get_timeout();
 
  715 transaction_context::transaction_context(
 
  716   spi::impl::ClientTransactionManagerServiceImpl& transaction_manager,
 
  718   : options_(txn_options)
 
  719   , txn_connection_(transaction_manager.connect())
 
  720   , transaction_(options_, transaction_manager.get_client(), txn_connection_)
 
  726     return transaction_.get_txn_id();
 
  732     return transaction_.begin();
 
  738     return transaction_.commit();
 
  744     return transaction_.rollback();
 
  748   : timeout_(std::chrono::minutes(2))
 
  756     return transaction_type_;
 
  762     transaction_type_ = type;
 
  766 std::chrono::milliseconds
 
  775     if (duration.count() <= 0) {
 
  776         BOOST_THROW_EXCEPTION(exception::illegal_state(
 
  777           "TransactionOptions::setTimeout", 
"Timeout must be positive!"));
 
  792     if (num_machines < 0) {
 
  793         BOOST_THROW_EXCEPTION(
 
  794           exception::illegal_state(
"TransactionOptions::setDurability",
 
  795                                    "Durability cannot be negative!"));
 
  797     this->durability_ = num_machines;
 
  805 hash<std::pair<std::string, std::string>>::operator()(
 
  806   const std::pair<std::string, std::string>& val) 
const noexcept
 
  808     return std::hash<std::string>{}(val.first + val.second);
 
boost::future< void > begin_transaction()
Begins a transaction.
 
boost::future< void > commit_transaction()
Commits a transaction.
 
boost::future< void > rollback_transaction()
Begins a transaction.
 
boost::uuids::uuid get_txn_id() const
 
Contains the configuration for a Hazelcast transaction.
 
transaction_options & set_timeout(std::chrono::milliseconds duration)
The timeout determines the maximum lifespan of a transaction.
 
transaction_options & set_transaction_type(transaction_type transaction_type)
Sets the TransactionType.
 
transaction_type get_transaction_type() const
 
std::chrono::milliseconds get_timeout() const
 
transaction_options & set_durability(int num_machines)
Sets the transaction durability.
 
transaction_type
Transaction type.
 
transaction_options()
Creates a new default configured TransactionsOptions.
 
int get_durability() const