17 #include <boost/uuid/random_generator.hpp>
19 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
20 #include "hazelcast/client/protocol/codec/codecs.h"
21 #include "hazelcast/client/protocol/codec/builtin/sql_page_codec.h"
22 #include "hazelcast/client/spi/ClientContext.h"
23 #include "hazelcast/client/spi/impl/ClientInvocation.h"
24 #include "hazelcast/client/sql/impl/query_id.h"
25 #include "hazelcast/client/sql/sql_column_metadata.h"
26 #include "hazelcast/client/sql/sql_column_type.h"
27 #include "hazelcast/client/sql/sql_result.h"
28 #include "hazelcast/client/sql/sql_statement.h"
29 #include "hazelcast/client/sql/hazelcast_sql_exception.h"
30 #include "hazelcast/client/sql/sql_service.h"
31 #include "hazelcast/client/hazelcast_client.h"
32 #include "hazelcast/client/sql/impl/query_utils.h"
33 #include "hazelcast/client/sql/impl/sql_error_code.h"
34 #include "hazelcast/client/sql/sql_row_metadata.h"
35 #include "hazelcast/client/sql/sql_page.h"
41 sql_service::sql_service(client::spi::ClientContext& context)
42 : client_context_(context)
47 sql_service::create_query_id(
48 const std::shared_ptr<connection::Connection>& query_conn)
50 auto local_id = client_context_.random_uuid();
51 auto member_id = query_conn->get_remote_uuid();
53 return { member_id, local_id };
56 boost::future<std::shared_ptr<sql_result>>
59 using protocol::ClientMessage;
61 auto query_conn = query_connection();
62 sql::impl::query_id qid = create_query_id(query_conn);
64 auto request = protocol::codec::sql_execute_encode(
66 statement.serialized_parameters_,
67 static_cast<int64_t
>(statement.
timeout().count()),
69 statement.
schema() ? &statement.
schema().value() :
nullptr,
74 auto invocation = spi::impl::ClientInvocation::create(
75 client_context_, request,
"", query_conn);
79 return invocation->invoke().then(
81 [
this, query_conn, qid, cursor_buffer_size](
82 boost::future<ClientMessage> response_fut) {
84 auto response = response_fut.get();
85 return handle_execute_response(
86 response, query_conn, qid, cursor_buffer_size);
87 }
catch (
const std::exception& e) {
88 rethrow(e, query_conn);
91 return std::shared_ptr<sql_result>();
96 sql_service::close(
const std::shared_ptr<connection::Connection>& connection,
99 auto close_message = protocol::codec::sql_close_encode(
id);
101 auto invocation = spi::impl::ClientInvocation::create(
102 client_context_, close_message,
"", connection);
103 return invocation->invoke().then(
104 [
this, connection](boost::future<protocol::ClientMessage> f) {
108 rethrow(e, connection);
115 sql_service::sql_execute_response_parameters
116 sql_service::decode_execute_response(protocol::ClientMessage& msg)
118 static constexpr
size_t RESPONSE_UPDATE_COUNT_FIELD_OFFSET =
119 protocol::ClientMessage::RESPONSE_HEADER_LEN;
120 static constexpr
size_t RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET =
121 RESPONSE_UPDATE_COUNT_FIELD_OFFSET + protocol::ClientMessage::INT64_SIZE;
123 sql_execute_response_parameters response;
125 auto initial_frame_header = msg.read_frame_header();
126 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN -
127 protocol::ClientMessage::SIZE_OF_FRAME_LENGTH_AND_FLAGS);
129 response.update_count = msg.get<int64_t>();
131 auto frame_len =
static_cast<int32_t
>(initial_frame_header.frame_len);
133 static_cast<int32_t
>(RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
134 protocol::ClientMessage::BOOL_SIZE)) {
135 response.is_infinite_rows = msg.get<
bool>();
136 response.is_infinite_rows_exist =
true;
138 msg.rd_ptr(frame_len -
139 static_cast<int32_t
>(RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
140 protocol::ClientMessage::BOOL_SIZE));
142 response.is_infinite_rows_exist =
false;
144 msg.rd_ptr(
static_cast<std::size_t
>(frame_len) -
145 RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET);
148 auto column_metadata = msg.get_nullable<std::vector<sql_column_metadata>>();
149 if (column_metadata) {
150 response.row_metadata = std::make_shared<sql_row_metadata>(std::move(column_metadata.value()));
152 auto row_metadata = response.row_metadata;
153 auto page = msg.get_nullable<std::shared_ptr<sql::sql_page>>(
154 [row_metadata](protocol::ClientMessage& msg) {
155 return protocol::codec::builtin::sql_page_codec::decode(msg,
159 response.first_page = *std::move(page);
162 response.error = msg.get_nullable<impl::sql_error>();
167 sql_service::sql_fetch_response_parameters
168 sql_service::decode_fetch_response(protocol::ClientMessage message)
171 message.skip_frame();
174 message.get_nullable<std::shared_ptr<sql::sql_page>>([](protocol::ClientMessage& msg) {
175 return protocol::codec::builtin::sql_page_codec::decode(msg);
177 auto error = message.get<boost::optional<impl::sql_error>>();
178 return { std::move(page), std::move(error) };
181 std::shared_ptr<sql_result>
182 sql_service::handle_execute_response(
183 protocol::ClientMessage& msg,
184 std::shared_ptr<connection::Connection> connection,
186 int32_t cursor_buffer_size)
188 auto response = decode_execute_response(msg);
189 if (response.error) {
190 BOOST_THROW_EXCEPTION(sql::hazelcast_sql_exception(
191 "sql_service::handle_execute_response",
192 std::move(response.error->originating_member_id),
193 response.error->code,
194 std::move(response.error->message),
195 std::move(response.error->suggestion)));
198 return std::shared_ptr<sql_result>(
199 new sql_result(&client_context_,
201 std::move(connection),
203 response.update_count,
204 std::move(response.row_metadata),
205 std::move(response.first_page),
206 cursor_buffer_size));
209 std::shared_ptr<connection::Connection>
210 sql_service::query_connection()
212 std::shared_ptr<connection::Connection> connection;
214 auto& cs = client_context_.get_client_cluster_service();
216 client_context_.get_connection_manager().connection_for_sql(
218 return impl::query_utils::member_of_same_larger_version_group(
219 cs.get_member_list());
221 [&](boost::uuids::uuid id) {
return cs.get_member(
id); });
225 throw exception::query(
226 static_cast<int32_t
>(
227 impl::sql_error_code::CONNECTION_PROBLEM),
228 "Client is not connected");
229 }
catch (
const exception::query& e) {
234 }
catch (
const std::exception& e) {
242 sql_service::rethrow(
const std::exception& e)
246 std::rethrow_if_nested(e);
247 }
catch (exception::access_control&) {
250 impl::query_utils::throw_public_exception(std::current_exception(),
254 impl::query_utils::throw_public_exception(std::current_exception(),
259 sql_service::rethrow(
const std::exception& cause,
260 const std::shared_ptr<connection::Connection>& connection)
262 if (!connection->is_alive()) {
263 auto msg = (boost::format(
"Cluster topology changed while a query was "
264 "executed: Member cannot be reached: %1%") %
265 connection->get_remote_address())
267 return impl::query_utils::throw_public_exception(
268 std::make_exception_ptr(exception::query(
269 static_cast<int32_t
>(impl::sql_error_code::CONNECTION_PROBLEM),
271 std::make_exception_ptr(cause))),
275 return rethrow(cause);
279 sql_service::client_id()
281 return client_context_.get_connection_manager().get_client_uuid();
284 boost::future<std::shared_ptr<sql_page>>
285 sql_service::fetch_page(
286 const impl::query_id& q_id,
287 int32_t cursor_buffer_size,
288 const std::shared_ptr<connection::Connection>& connection)
290 auto request_message =
291 protocol::codec::sql_fetch_encode(q_id, cursor_buffer_size);
293 auto response_future = spi::impl::ClientInvocation::create(
294 client_context_, request_message,
"", connection)
297 return response_future.then(
298 boost::launch::sync, [
this](boost::future<protocol::ClientMessage> f) {
299 std::shared_ptr<sql_page> page;
301 auto response_message = f.get();
303 sql_fetch_response_parameters response_params =
304 this->decode_fetch_response(std::move(response_message));
306 hazelcast::client::sql::sql_service::handle_fetch_response_error(
307 std::move(response_params.error));
309 page = std::move(*response_params.page);
310 }
catch (exception::iexception&) {
311 impl::query_utils::throw_public_exception(
312 std::current_exception(), this->client_id());
320 sql_service::handle_fetch_response_error(boost::optional<impl::sql_error> error)
323 throw hazelcast_sql_exception(
324 "sql_service::handle_fetch_response_error",
325 error->originating_member_id,
332 constexpr std::chrono::milliseconds sql_statement::TIMEOUT_NOT_SET;
333 constexpr std::chrono::milliseconds sql_statement::TIMEOUT_DISABLED;
334 constexpr std::chrono::milliseconds sql_statement::DEFAULT_TIMEOUT;
335 constexpr int32_t sql_statement::DEFAULT_CURSOR_BUFFER_SIZE;
338 : serialized_parameters_{}
339 , cursor_buffer_size_{ DEFAULT_CURSOR_BUFFER_SIZE }
340 , timeout_{ TIMEOUT_NOT_SET }
341 , expected_result_type_{ sql_expected_result_type::any }
343 , serialization_service_(
344 spi::ClientContext(client).get_serialization_service())
346 sql(std::move(query));
351 : serialized_parameters_{}
352 , cursor_buffer_size_{ DEFAULT_CURSOR_BUFFER_SIZE }
353 , timeout_{ TIMEOUT_NOT_SET }
354 , expected_result_type_{ sql_expected_result_type::any }
356 , serialization_service_(client_context.get_serialization_service())
358 sql(std::move(query));
370 util::Preconditions::check_not_empty(sql_string,
"SQL cannot be empty");
372 sql_ = std::move(sql_string);
380 serialized_parameters_.clear();
388 return cursor_buffer_size_;
394 util::Preconditions::check_positive(
396 (boost::format(
"Cursor buffer size must be positive: %s") % size).str());
397 cursor_buffer_size_ = size;
402 std::chrono::milliseconds
411 auto timeout_msecs =
timeout.count();
413 throw exception::illegal_argument(
414 "sql_statement::timeout(std::chrono::milliseconds timeout)",
415 (boost::format(
"Timeout must be non-negative or -1: %1% msecs") %
425 const boost::optional<std::string>&
434 schema_ = std::move(
schema);
438 sql::sql_expected_result_type
441 return expected_result_type_;
447 expected_result_type_ = type;
451 hazelcast_sql_exception::hazelcast_sql_exception(
453 boost::uuids::uuid originating_member_id,
455 boost::optional<std::string> message,
456 boost::optional<std::string> suggestion,
457 std::exception_ptr cause)
458 : hazelcast_(std::move(source),
459 message ? std::move(message).value() :
"",
462 , originating_member_id_(std::move(originating_member_id))
464 , suggestion_(std::move(suggestion))
468 const boost::uuids::uuid&
471 return originating_member_id_;
480 const boost::optional<std::string>&
488 std::ostream& operator<<(std::ostream& os,
const query_id&
id)
490 os <<
"query_id{member_id: " << boost::uuids::to_string(
id.member_id)
491 <<
" local_id: " << boost::uuids::to_string(
id.local_id) <<
"}";
496 query_utils::throw_public_exception(std::exception_ptr exc,
497 boost::uuids::uuid
id)
500 std::rethrow_exception(exc);
501 }
catch (hazelcast_sql_exception& e) {
503 }
catch (exception::query& e) {
504 auto originating_member_id = e.originating_member_uuid();
505 if (originating_member_id.is_nil()) {
506 originating_member_id = id;
509 throw hazelcast_sql_exception(
"query_utils::throw_public_exception",
510 originating_member_id,
515 }
catch (exception::iexception& ie) {
516 throw hazelcast_sql_exception(
517 "query_utils::throw_public_exception",
519 static_cast<int32_t
>(sql_error_code::GENERIC),
526 boost::optional<member>
527 query_utils::member_of_same_larger_version_group(
528 const std::vector<member>& members)
534 boost::optional<member::version> version0;
535 boost::optional<member::version> version1;
539 for (
const auto& m : members) {
540 if (m.is_lite_member()) {
543 auto v = m.get_version();
544 if (!version0 || *version0 == v) {
547 }
else if (!version1 || *version1 == v) {
551 throw exception::runtime(
552 "query_utils::member_of_same_larger_version_group",
554 "More than 2 distinct member versions found: %1% , %2%") %
560 assert(count1 == 0 || count0 > 0);
568 member::version version;
569 if (count0 > count1 || (count0 == count1 && *version0 > *version1)) {
578 static thread_local std::mt19937 generator;
579 std::uniform_int_distribution<int> distribution(0, count - 1);
580 auto random_member_index = distribution(generator);
581 for (
const auto& m : members) {
582 if (!m.is_lite_member() && m.get_version() == version) {
583 random_member_index--;
584 if (random_member_index < 0) {
590 throw exception::runtime(
"query_utils::member_of_same_larger_version_group",
591 "should never get here");
596 sql_result::sql_result(
597 spi::ClientContext* client_context,
598 sql_service* service,
599 std::shared_ptr<connection::Connection> connection,
601 int64_t update_count,
602 std::shared_ptr<sql_row_metadata> row_metadata,
603 std::shared_ptr<sql_page> first_page,
604 int32_t cursor_buffer_size)
605 : client_context_(client_context)
607 , connection_(std::move(connection))
609 , update_count_(update_count)
610 , row_metadata_(std::move(row_metadata))
611 , first_page_(std::move(first_page))
612 , iterator_requested_(false)
614 , cursor_buffer_size_(cursor_buffer_size)
618 first_page_->row_metadata(row_metadata_);
619 first_page_->serialization_service(
620 &client_context_->get_serialization_service());
630 return update_count_;
645 BOOST_THROW_EXCEPTION(exception::illegal_state(
646 "sql_result::iterator",
"This result contains only update count"));
649 if (iterator_requested_) {
650 BOOST_THROW_EXCEPTION(exception::illegal_state(
651 "sql_result::page_iterator",
"Iterator can be requested only once"));
654 iterator_requested_ =
true;
656 return { shared_from_this(), first_page_ };
660 sql_result::check_closed()
const
663 impl::query_utils::throw_public_exception(
665 static_cast<int32_t
>(impl::sql_error_code::CANCELLED_BY_USER),
666 "Query was cancelled by the user")),
667 service_->client_id());
675 return boost::make_ready_future();
678 auto release_resources = [
this](){
680 std::lock_guard<std::mutex> guard{ mtx_ };
686 row_metadata_.reset();
692 auto f = service_->close(connection_, query_id_);
697 }
catch (
const std::exception& e) {
700 service_->rethrow(e);
704 return boost::make_ready_future();
707 boost::future<std::shared_ptr<sql_page>>
708 sql_result::fetch_page()
710 std::lock_guard<std::mutex> guard{ mtx_ };
713 return service_->fetch_page(query_id_, cursor_buffer_size_, connection_);
716 const sql_row_metadata&
719 if (!row_metadata_) {
720 throw exception::illegal_state(
721 "sql_result::row_metadata",
"This result contains only update count");
724 return *row_metadata_;
733 HZ_LOG(client_context_->get_logger(),
735 (boost::format(
"[sql_result::~sql_result()] Exception while "
736 "closing the query result. Query id: %1%") %
742 sql_result::page_iterator::page_iterator(std::shared_ptr<sql_result> result,
743 std::shared_ptr<sql_page> first_page)
744 : in_progress_{ std::make_shared<std::atomic<bool>>(false) }
745 , last_{ std::make_shared<std::atomic<bool>>(false) }
746 , row_metadata_{ result->row_metadata_ }
747 , serialization_(&result->client_context_->get_serialization_service())
748 , result_{ move(result) }
749 , first_page_(move(first_page))
753 boost::future<std::shared_ptr<sql_page>>
756 result_->check_closed();
759 auto page = move(first_page_);
761 page->serialization_service(serialization_);
762 page->row_metadata(row_metadata_);
763 *last_ = page->last();
765 return boost::make_ready_future<std::shared_ptr<sql_page>>(page);
769 BOOST_THROW_EXCEPTION(
770 exception::illegal_access(
"sql_result::page_iterator::next",
771 "Fetch page operation is already in "
772 "progress so next must not be called."));
776 BOOST_THROW_EXCEPTION(exception::no_such_element(
777 "sql_result::page_iterator::next",
778 "Last page is already retrieved so there are no more pages."));
781 *in_progress_ =
true;
783 auto page_future = result_->fetch_page();
785 std::weak_ptr<std::atomic<bool>> last_w{ last_ };
786 std::weak_ptr<std::atomic<bool>> in_progress_w{ in_progress_ };
787 std::shared_ptr<sql_row_metadata>
row_metadata{ row_metadata_ };
788 auto result = result_;
789 auto serialization_service = serialization_;
791 return page_future.then(
793 [serialization_service,
row_metadata, last_w, in_progress_w, result](
794 boost::future<std::shared_ptr<sql_page>> page_f) {
796 auto page = page_f.get();
798 result->check_closed();
799 page->serialization_service(serialization_service);
802 auto last = last_w.lock();
805 *last = page->last();
807 auto in_progress = in_progress_w.lock();
810 *in_progress =
false;
814 auto in_progress = in_progress_w.lock();
817 *in_progress =
false;
827 result_->check_closed();
832 sql_page::page_data::column_count()
const
834 return column_types_.size();
838 sql_page::page_data::row_count()
const
840 return columns_[0].size();
843 sql_page::sql_row::sql_row(
size_t row_index,
844 std::shared_ptr<page_data> shared)
845 : row_index_(row_index)
846 , page_data_(std::move(shared))
851 sql_page::sql_row::resolve_index(
const std::string& column_name)
const
853 auto it = page_data_->row_metadata_->find_column(column_name);
854 if (it == page_data_->row_metadata_->end()) {
855 throw exception::illegal_argument(
856 "sql_page::get_object(const std::string &)",
857 (boost::format(
"Column %1% doesn't exist") % column_name).str());
859 auto column_index = it->second;
863 const sql_row_metadata&
866 return *page_data_->row_metadata_;
870 sql_page::sql_row::check_index(
size_t index)
const
873 throw exception::index_out_of_bounds(
874 "sql_page::sql_row::check_index",
875 (boost::format(
"Column index is out of range: %1%") % index).str());
880 std::vector<column> columns,
883 : page_data_{ new page_data{ std::move(column_types),
892 sql_page::construct_rows()
896 for (
size_t i = 0; i < count; ++i) {
897 rows_.emplace_back(i, page_data_);
901 const std::vector<sql_column_type>&
904 return page_data_->column_types_;
916 return page_data_->column_count();
922 return page_data_->row_count();
925 const std::vector<sql_page::sql_row>&
932 sql_page::row_metadata(std::shared_ptr<sql_row_metadata> row_meta)
934 page_data_->row_metadata_ = std::move(row_meta);
938 sql_page::serialization_service(serialization::pimpl::SerializationService* ss)
940 page_data_->serialization_service_ = ss;
943 sql_row_metadata::sql_row_metadata(std::vector<sql_column_metadata> columns)
944 : columns_(std::move(columns))
946 assert(!columns_.empty());
948 name_to_index_.reserve(columns_.size());
949 for (std::size_t i = 0; i < columns_.size(); ++i) {
950 name_to_index_.emplace(columns_[i].name, i);
957 return columns_.size();
963 if (index >= columns_.size()) {
964 throw exception::index_out_of_bounds(
965 "sql_row_metadata::column(std::size_t index)",
966 (boost::format(
"Column index is out of bounds: %1%") % index).str());
969 return columns_[index];
972 const std::vector<sql_column_metadata>&
981 return name_to_index_.find(column_name);
987 return name_to_index_.end();
992 return lhs.columns_ == rhs.columns_;
996 operator==(
const sql_column_metadata& lhs,
const sql_column_metadata& rhs)
998 return lhs.name == rhs.name && lhs.type == rhs.type &&
999 lhs.nullable == rhs.nullable;
Base class for all exception originated from Hazelcast methods.
int32_t code() const
Gets the internal error code associated with the exception.
const boost::uuids::uuid & originating_member_id() const
Gets ID of the member that caused or initiated an error condition.
const boost::optional< std::string > & suggestion() const
Gets the suggested SQL statement to remediate experienced error.
const sql_row_metadata & row_metadata() const
Gets the row metadata.
std::size_t row_count() const
Returns the number of rows in this page.
sql_page(std::vector< sql_column_type > column_types, std::vector< column > columns, bool last, std::shared_ptr< sql_row_metadata > row_metadata=nullptr)
Constructs an sql_page from the response returned from the server.
std::size_t column_count() const
Returns the number of columns in each row.
const std::vector< sql_row > & rows() const
Returns the rows of this page.
const std::vector< sql_column_type > & column_types() const
Returns the types of the columns in each row.
bool has_next() const
Tells whether there are pages to be retrieved.
boost::future< std::shared_ptr< sql_page > > next()
Fetches the new page.
bool row_set() const
Return whether this result has rows to iterate using the iterator() method.
const sql_row_metadata & row_metadata() const
Gets the row metadata.
page_iterator iterator()
Returns an iterator over the result pages.
virtual ~sql_result()
The destructor closes the result if it were open.
int64_t update_count() const
Returns the number of rows updated by the statement or -1 if this result is a row set.
boost::future< void > close()
Release the resources associated with the query result.
Definition of an SQL statement.
const boost::optional< std::string > & schema() const
Gets the schema name.
std::chrono::milliseconds timeout() const
Gets the execution timeout in milliseconds.
sql_statement & clear_parameters()
Clears statement parameter values.
sql_statement(hazelcast_client &client, std::string query)
Creates a statement with the given query.
int32_t cursor_buffer_size() const
Gets the cursor buffer size (measured in the number of rows).
const std::string & sql() const
static constexpr std::chrono::milliseconds TIMEOUT_NOT_SET
Value for the timeout that is not set.
sql_expected_result_type expected_result_type() const
Gets the expected result type.