17 #include <boost/uuid/uuid_io.hpp>
19 #include <hazelcast/client/txn/client_transaction_util.h>
20 #include "hazelcast/client/txn/TransactionProxy.h"
21 #include "hazelcast/client/transaction_options.h"
22 #include "hazelcast/util/Util.h"
23 #include "hazelcast/client/spi/impl/ClientInvocation.h"
24 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
25 #include "hazelcast/client/proxy/TransactionalMapImpl.h"
26 #include "hazelcast/client/proxy/TransactionalMultiMapImpl.h"
27 #include "hazelcast/client/proxy/TransactionalListImpl.h"
28 #include "hazelcast/client/proxy/TransactionalQueueImpl.h"
29 #include "hazelcast/client/proxy/TransactionalSetImpl.h"
30 #include "hazelcast/client/imap.h"
31 #include "hazelcast/client/multi_map.h"
32 #include "hazelcast/client/ilist.h"
33 #include "hazelcast/client/iqueue.h"
34 #include "hazelcast/client/iset.h"
35 #include "hazelcast/client/transaction_context.h"
36 #include "hazelcast/client/spi/impl/ClientTransactionManagerServiceImpl.h"
37 #include "hazelcast/client/protocol/codec/codecs.h"
42 TransactionProxy::TransactionProxy(
43 transaction_options& txn_options,
44 spi::ClientContext& client_context,
45 std::shared_ptr<connection::Connection> connection)
46 : options_(txn_options)
47 , client_context_(client_context)
48 , connection_(connection)
49 , thread_id_(util::get_current_thread_id())
50 , state_(TxnState::NO_TXN)
53 TransactionProxy::TransactionProxy(
const TransactionProxy& rhs)
54 : options_(rhs.options_)
55 , client_context_(rhs.client_context_)
56 , connection_(rhs.connection_)
57 , thread_id_(rhs.thread_id_)
58 , txn_id_(rhs.txn_id_)
60 , start_time_(rhs.start_time_)
62 transaction_exists_.store(rhs.transaction_exists_.load());
66 TransactionProxy::get_txn_id()
const
72 TransactionProxy::get_state()
const
77 std::chrono::milliseconds
78 TransactionProxy::get_timeout()
const
80 return options_.get_timeout();
84 TransactionProxy::begin()
87 if (state_ == TxnState::ACTIVE) {
88 BOOST_THROW_EXCEPTION(exception::illegal_state(
89 "TransactionProxy::begin()",
"Transaction is already active"));
92 if (transaction_exists_) {
93 BOOST_THROW_EXCEPTION(
94 exception::illegal_state(
"TransactionProxy::begin()",
95 "Nested transactions are not allowed!"));
97 transaction_exists_.store(
true);
98 start_time_ = std::chrono::steady_clock::now();
99 auto request = protocol::codec::transaction_create_encode(
100 std::chrono::duration_cast<std::chrono::milliseconds>(get_timeout())
102 options_.get_durability(),
103 static_cast<int32_t
>(options_.get_transaction_type()),
105 return invoke(request).then(
106 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
110 msg.rd_ptr(msg.RESPONSE_HEADER_LEN);
111 this->txn_id_ = msg.get<boost::uuids::uuid>();
112 this->state_ = TxnState::ACTIVE;
113 }
catch (exception::iexception&) {
114 transaction_exists_.store(
false);
118 }
catch (exception::iexception&) {
119 transaction_exists_.store(
false);
125 TransactionProxy::commit()
128 if (state_ != TxnState::ACTIVE) {
129 BOOST_THROW_EXCEPTION(exception::illegal_state(
130 "TransactionProxy::commit()",
"Transaction is not active"));
132 state_ = TxnState::COMMITTING;
137 protocol::codec::transaction_commit_encode(txn_id_, thread_id_);
138 return invoke(request).then(
139 boost::launch::sync, [=](boost::future<protocol::ClientMessage> f) {
142 state_ = TxnState::COMMITTED;
143 }
catch (exception::iexception&) {
144 transaction_exists_.store(
false);
145 client_transaction_util::transaction_exception_factory()
146 ->rethrow(std::current_exception(),
147 "TransactionProxy::commit() failed");
151 state_ = TxnState::COMMIT_FAILED;
152 transaction_exists_.store(
false);
153 client_transaction_util::transaction_exception_factory()->rethrow(
154 std::current_exception(),
"TransactionProxy::commit() failed");
155 return boost::make_ready_future();
160 TransactionProxy::rollback()
163 if (state_ == TxnState::NO_TXN || state_ == TxnState::ROLLED_BACK) {
164 BOOST_THROW_EXCEPTION(exception::illegal_state(
165 "TransactionProxy::rollback()",
"Transaction is not active"));
167 state_ = TxnState::ROLLING_BACK;
171 protocol::codec::transaction_rollback_encode(txn_id_, thread_id_);
172 return invoke(request).then(
174 [=](boost::future<protocol::ClientMessage> f) {
176 state_ = TxnState::ROLLED_BACK;
177 transaction_exists_.store(
false);
179 }
catch (exception::iexception& e) {
181 client_context_.get_logger(),
185 "Exception while rolling back the transaction. "
190 }
catch (exception::iexception& exception) {
191 HZ_LOG(client_context_.get_logger(),
193 boost::str(boost::format(
194 "Exception while rolling back the transaction. "
198 state_ = TxnState::ROLLED_BACK;
199 transaction_exists_.store(
false);
200 }
catch (exception::iexception&) {
201 transaction_exists_.store(
false);
202 client_transaction_util::transaction_exception_factory()->rethrow(
203 std::current_exception(),
"TransactionProxy::rollback() failed");
205 return boost::make_ready_future();
208 serialization::pimpl::SerializationService&
209 TransactionProxy::get_serialization_service()
211 return client_context_.get_serialization_service();
214 std::shared_ptr<connection::Connection>
215 TransactionProxy::get_connection()
221 TransactionProxy::check_thread()
223 if (thread_id_ != util::get_current_thread_id()) {
224 BOOST_THROW_EXCEPTION(exception::illegal_state(
225 "TransactionProxy::checkThread()",
226 "Transaction cannot span multiple threads!"));
231 TransactionProxy::check_timeout()
233 if (start_time_ + options_.get_timeout() <
234 std::chrono::steady_clock::now()) {
235 BOOST_THROW_EXCEPTION(exception::transaction(
236 "TransactionProxy::checkTimeout()",
"Transaction is timed-out!"));
240 TxnState::TxnState(state value)
246 values[2] = PREPARING;
247 values[3] = PREPARED;
248 values[4] = COMMITTING;
249 values[5] = COMMITTED;
250 values[6] = COMMIT_FAILED;
251 values[7] = ROLLING_BACK;
252 values[8] = ROLLED_BACK;
255 TxnState::operator int()
const
261 TxnState::operator=(
int i)
266 boost::future<protocol::ClientMessage>
267 TransactionProxy::invoke(protocol::ClientMessage& request)
269 return client_transaction_util::invoke(
271 boost::uuids::to_string(get_txn_id()),
277 TransactionProxy::get_client_context()
const
279 return client_context_;
282 const std::shared_ptr<util::exception_util::runtime_exception_factory>
283 client_transaction_util::exceptionFactory(
284 new class transaction_exception_factory());
286 boost::future<protocol::ClientMessage>
287 client_transaction_util::invoke(
288 protocol::ClientMessage& request,
289 const std::string& object_name,
290 spi::ClientContext& client,
291 const std::shared_ptr<connection::Connection>& connection)
294 std::shared_ptr<spi::impl::ClientInvocation> clientInvocation =
295 spi::impl::ClientInvocation::create(
296 client, request, object_name, connection);
297 return clientInvocation->invoke();
298 }
catch (exception::iexception&) {
299 transaction_exception_factory()->rethrow(
300 std::current_exception(),
"ClientTransactionUtil::invoke failed");
301 return boost::make_ready_future(protocol::ClientMessage(0));
305 const std::shared_ptr<util::exception_util::runtime_exception_factory>&
306 client_transaction_util::transaction_exception_factory()
308 return exceptionFactory;
312 client_transaction_util::transaction_exception_factory::rethrow(
313 std::exception_ptr throwable,
314 const std::string& message)
317 std::rethrow_exception(throwable);
319 std::throw_with_nested(
320 boost::enable_current_exception(exception::transaction(
321 "transaction_exceptionFactory::create", message)));
327 TransactionalMapImpl::TransactionalMapImpl(
328 const std::string& name,
329 txn::TransactionProxy& transaction_proxy)
330 : TransactionalObject(imap::SERVICE_NAME, name, transaction_proxy)
334 TransactionalMapImpl::contains_key_data(
const serialization::pimpl::data& key)
336 auto request = protocol::codec::transactionalmap_containskey_encode(
337 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
339 return invoke_and_get_future<bool>(request);
342 boost::future<boost::optional<serialization::pimpl::data>>
343 TransactionalMapImpl::get_data(
const serialization::pimpl::data& key)
345 auto request = protocol::codec::transactionalmap_get_encode(
346 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
348 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
353 TransactionalMapImpl::size()
355 auto request = protocol::codec::transactionalmap_size_encode(
356 get_name(), get_transaction_id(), util::get_current_thread_id());
358 return invoke_and_get_future<int>(request);
362 TransactionalMapImpl::is_empty()
364 auto request = protocol::codec::transactionalmap_isempty_encode(
365 get_name(), get_transaction_id(), util::get_current_thread_id());
367 return invoke_and_get_future<bool>(request);
370 boost::future<boost::optional<serialization::pimpl::data>>
371 TransactionalMapImpl::put_data(
const serialization::pimpl::data& key,
372 const serialization::pimpl::data& value)
375 auto request = protocol::codec::transactionalmap_put_encode(
377 get_transaction_id(),
378 util::get_current_thread_id(),
381 std::chrono::duration_cast<std::chrono::milliseconds>(get_timeout())
384 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
389 TransactionalMapImpl::set_data(
const serialization::pimpl::data& key,
390 const serialization::pimpl::data& value)
392 auto request = protocol::codec::transactionalmap_set_encode(
394 get_transaction_id(),
395 util::get_current_thread_id(),
399 return to_void_future(invoke(request));
402 boost::future<boost::optional<serialization::pimpl::data>>
403 TransactionalMapImpl::put_if_absent_data(
404 const serialization::pimpl::data& key,
405 const serialization::pimpl::data& value)
407 auto request = protocol::codec::transactionalmap_putifabsent_encode(
409 get_transaction_id(),
410 util::get_current_thread_id(),
414 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
418 boost::future<boost::optional<serialization::pimpl::data>>
419 TransactionalMapImpl::replace_data(
const serialization::pimpl::data& key,
420 const serialization::pimpl::data& value)
422 auto request = protocol::codec::transactionalmap_replace_encode(
424 get_transaction_id(),
425 util::get_current_thread_id(),
429 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
434 TransactionalMapImpl::replace_data(
const serialization::pimpl::data& key,
435 const serialization::pimpl::data& old_value,
436 const serialization::pimpl::data& new_value)
438 auto request = protocol::codec::transactionalmap_replaceifsame_encode(
440 get_transaction_id(),
441 util::get_current_thread_id(),
446 return invoke_and_get_future<bool>(request);
449 boost::future<boost::optional<serialization::pimpl::data>>
450 TransactionalMapImpl::remove_data(
const serialization::pimpl::data& key)
452 auto request = protocol::codec::transactionalmap_remove_encode(
453 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
455 return invoke_and_get_future<boost::optional<serialization::pimpl::data>>(
460 TransactionalMapImpl::delete_entry_data(
const serialization::pimpl::data& key)
462 auto request = protocol::codec::transactionalmap_delete_encode(
463 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
465 return to_void_future(invoke(request));
469 TransactionalMapImpl::remove_data(
const serialization::pimpl::data& key,
470 const serialization::pimpl::data& value)
472 auto request = protocol::codec::transactionalmap_removeifsame_encode(
474 get_transaction_id(),
475 util::get_current_thread_id(),
479 return invoke_and_get_future<bool>(request);
482 boost::future<std::vector<serialization::pimpl::data>>
483 TransactionalMapImpl::key_set_data()
485 auto request = protocol::codec::transactionalmap_keyset_encode(
486 get_name(), get_transaction_id(), util::get_current_thread_id());
488 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
492 boost::future<std::vector<serialization::pimpl::data>>
493 TransactionalMapImpl::key_set_data(
const serialization::pimpl::data& predicate)
495 auto request = protocol::codec::transactionalmap_keysetwithpredicate_encode(
497 get_transaction_id(),
498 util::get_current_thread_id(),
501 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
505 boost::future<std::vector<serialization::pimpl::data>>
506 TransactionalMapImpl::values_data()
508 auto request = protocol::codec::transactionalmap_values_encode(
509 get_name(), get_transaction_id(), util::get_current_thread_id());
511 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
515 boost::future<std::vector<serialization::pimpl::data>>
516 TransactionalMapImpl::values_data(
const serialization::pimpl::data& predicate)
518 auto request = protocol::codec::transactionalmap_valueswithpredicate_encode(
520 get_transaction_id(),
521 util::get_current_thread_id(),
524 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
528 TransactionalMultiMapImpl::TransactionalMultiMapImpl(
529 const std::string& name,
530 txn::TransactionProxy& transaction_proxy)
531 : TransactionalObject(multi_map::SERVICE_NAME, name, transaction_proxy)
535 TransactionalMultiMapImpl::put_data(
const serialization::pimpl::data& key,
536 const serialization::pimpl::data& value)
538 auto request = protocol::codec::transactionalmultimap_put_encode(
540 get_transaction_id(),
541 util::get_current_thread_id(),
545 return invoke_and_get_future<bool>(request);
548 boost::future<std::vector<serialization::pimpl::data>>
549 TransactionalMultiMapImpl::get_data(
const serialization::pimpl::data& key)
551 auto request = protocol::codec::transactionalmultimap_get_encode(
552 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
554 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
559 TransactionalMultiMapImpl::remove(
const serialization::pimpl::data& key,
560 const serialization::pimpl::data& value)
562 auto request = protocol::codec::transactionalmultimap_removeentry_encode(
564 get_transaction_id(),
565 util::get_current_thread_id(),
569 return invoke_and_get_future<bool>(request);
572 boost::future<std::vector<serialization::pimpl::data>>
573 TransactionalMultiMapImpl::remove_data(
const serialization::pimpl::data& key)
575 auto request = protocol::codec::transactionalmultimap_remove_encode(
576 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
578 return invoke_and_get_future<std::vector<serialization::pimpl::data>>(
583 TransactionalMultiMapImpl::value_count(
const serialization::pimpl::data& key)
585 auto request = protocol::codec::transactionalmultimap_valuecount_encode(
586 get_name(), get_transaction_id(), util::get_current_thread_id(), key);
588 return invoke_and_get_future<int>(request);
592 TransactionalMultiMapImpl::size()
594 auto request = protocol::codec::transactionalmultimap_size_encode(
595 get_name(), get_transaction_id(), util::get_current_thread_id());
597 return invoke_and_get_future<int>(request);
600 TransactionalListImpl::TransactionalListImpl(
const std::string& object_name,
601 txn::TransactionProxy& context)
602 : TransactionalObject(ilist::SERVICE_NAME, object_name, context)
606 TransactionalListImpl::add(
const serialization::pimpl::data& e)
608 auto request = protocol::codec::transactionallist_add_encode(
609 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
611 return invoke_and_get_future<bool>(request);
615 TransactionalListImpl::remove(
const serialization::pimpl::data& e)
617 auto request = protocol::codec::transactionallist_remove_encode(
618 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
620 return invoke_and_get_future<bool>(request);
624 TransactionalListImpl::size()
626 auto request = protocol::codec::transactionallist_size_encode(
627 get_name(), get_transaction_id(), util::get_current_thread_id());
629 return invoke_and_get_future<int>(request);
632 TransactionalSetImpl::TransactionalSetImpl(
633 const std::string& name,
634 txn::TransactionProxy& transaction_proxy)
635 : TransactionalObject(iset::SERVICE_NAME, name, transaction_proxy)
639 TransactionalSetImpl::add_data(
const serialization::pimpl::data& e)
641 auto request = protocol::codec::transactionalset_add_encode(
642 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
644 return invoke_and_get_future<bool>(request);
648 TransactionalSetImpl::remove_data(
const serialization::pimpl::data& e)
650 auto request = protocol::codec::transactionalset_remove_encode(
651 get_name(), get_transaction_id(), util::get_current_thread_id(), e);
653 return invoke_and_get_future<bool>(request);
657 TransactionalSetImpl::size()
659 auto request = protocol::codec::transactionalset_size_encode(
660 get_name(), get_transaction_id(), util::get_current_thread_id());
662 return invoke_and_get_future<int>(request);
665 TransactionalObject::TransactionalObject(
const std::string& service_name,
666 const std::string& object_name,
667 txn::TransactionProxy& context)
668 : proxy::SerializingProxy(context.get_client_context(), object_name)
669 , service_name_(service_name)
674 TransactionalObject::~TransactionalObject() =
default;
677 TransactionalObject::get_service_name()
679 return service_name_;
683 TransactionalObject::get_name()
689 TransactionalObject::destroy()
693 protocol::codec::client_destroyproxy_encode(name_, service_name_);
694 return to_void_future(
695 invoke_on_connection(request, context_.get_connection()));
699 TransactionalObject::on_destroy()
703 TransactionalObject::get_transaction_id()
const
705 return context_.get_txn_id();
708 std::chrono::milliseconds
709 TransactionalObject::get_timeout()
const
711 return context_.get_timeout();
715 transaction_context::transaction_context(
716 spi::impl::ClientTransactionManagerServiceImpl& transaction_manager,
718 : options_(txn_options)
719 , txn_connection_(transaction_manager.connect())
720 , transaction_(options_, transaction_manager.get_client(), txn_connection_)
726 return transaction_.get_txn_id();
732 return transaction_.begin();
738 return transaction_.commit();
744 return transaction_.rollback();
748 : timeout_(std::chrono::minutes(2))
756 return transaction_type_;
762 transaction_type_ = type;
766 std::chrono::milliseconds
775 if (duration.count() <= 0) {
776 BOOST_THROW_EXCEPTION(exception::illegal_state(
777 "TransactionOptions::setTimeout",
"Timeout must be positive!"));
792 if (num_machines < 0) {
793 BOOST_THROW_EXCEPTION(
794 exception::illegal_state(
"TransactionOptions::setDurability",
795 "Durability cannot be negative!"));
797 this->durability_ = num_machines;
805 hash<std::pair<std::string, std::string>>::operator()(
806 const std::pair<std::string, std::string>& val)
const noexcept
808 return std::hash<std::string>{}(val.first + val.second);
boost::future< void > begin_transaction()
Begins a transaction.
boost::future< void > commit_transaction()
Commits a transaction.
boost::future< void > rollback_transaction()
Begins a transaction.
boost::uuids::uuid get_txn_id() const
Contains the configuration for a Hazelcast transaction.
transaction_options & set_timeout(std::chrono::milliseconds duration)
The timeout determines the maximum lifespan of a transaction.
transaction_options & set_transaction_type(transaction_type transaction_type)
Sets the TransactionType.
transaction_type get_transaction_type() const
std::chrono::milliseconds get_timeout() const
transaction_options & set_durability(int num_machines)
Sets the transaction durability.
transaction_type
Transaction type.
transaction_options()
Creates a new default configured TransactionsOptions.
int get_durability() const