17#include <boost/uuid/random_generator.hpp>
19#include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
20#include "hazelcast/client/protocol/codec/codecs.h"
21#include "hazelcast/client/protocol/codec/builtin/sql_page_codec.h"
22#include "hazelcast/client/spi/ClientContext.h"
23#include "hazelcast/client/spi/impl/ClientInvocation.h"
24#include "hazelcast/client/sql/impl/query_id.h"
25#include "hazelcast/client/sql/sql_column_metadata.h"
26#include "hazelcast/client/sql/sql_column_type.h"
27#include "hazelcast/client/sql/sql_result.h"
28#include "hazelcast/client/sql/sql_statement.h"
29#include "hazelcast/client/sql/hazelcast_sql_exception.h"
30#include "hazelcast/client/sql/sql_service.h"
31#include "hazelcast/client/hazelcast_client.h"
32#include "hazelcast/client/sql/impl/query_utils.h"
33#include "hazelcast/client/sql/impl/sql_error_code.h"
34#include "hazelcast/client/sql/sql_row_metadata.h"
35#include "hazelcast/client/sql/sql_page.h"
41sql_service::sql_service(client::spi::ClientContext& context)
42 : client_context_(context)
44 is_smart_routing_ = client_context_.get_client_config()
48 const auto& client_properties = client_context_.get_client_properties();
49 const int32_t partition_arg_cache_size = client_properties.get_integer(
50 client_properties.partition_arg_cache_size());
51 const int32_t partition_arg_cache_threshold =
52 partition_arg_cache_size + std::min(partition_arg_cache_size / 10, 50);
53 partition_argument_index_cache_ =
54 std::make_shared<impl::read_optimized_lru_cache<std::string, int32_t>>(
55 partition_arg_cache_size, partition_arg_cache_threshold);
59sql_service::create_query_id(
60 const std::shared_ptr<connection::Connection>& query_conn)
62 auto local_id = client_context_.random_uuid();
63 auto member_id = query_conn->get_remote_uuid();
65 return { member_id, local_id };
68boost::future<std::shared_ptr<sql_result>>
71 using protocol::ClientMessage;
74 int32_t statement_par_arg_index = statement_par_arg_index_ptr !=
nullptr
75 ? statement_par_arg_index_ptr->load()
78 auto arg_index = statement_par_arg_index != -1
79 ? statement_par_arg_index
80 : (partition_argument_index_cache_->get_or_default(
81 statement.
sql(), -1));
83 auto partition_id = extract_partition_id(statement, arg_index);
84 std::shared_ptr<connection::Connection> query_conn =
85 partition_id ? query_connection(partition_id.value())
88 sql::impl::query_id qid = create_query_id(query_conn);
90 auto request = protocol::codec::sql_execute_encode(
92 statement.serialized_parameters_,
93 static_cast<int64_t
>(statement.
timeout().count()),
95 statement.
schema() ? &statement.
schema().value() :
nullptr,
100 auto invocation = spi::impl::ClientInvocation::create(
101 client_context_, request,
"", query_conn);
105 std::weak_ptr<std::atomic<int32_t>> statement_par_arg_index_weak_ptr =
106 statement_par_arg_index_ptr;
108 auto sql_query = statement.
sql();
109 return invocation->invoke().then(
117 statement_par_arg_index_weak_ptr](
118 boost::future<ClientMessage> response_fut) {
120 auto response = response_fut.get();
121 return handle_execute_response(sql_query,
127 statement_par_arg_index_weak_ptr);
128 }
catch (
const std::exception& e) {
129 rethrow(e, query_conn);
132 return std::shared_ptr<sql_result>();
137sql_service::close(
const std::shared_ptr<connection::Connection>& connection,
140 auto close_message = protocol::codec::sql_close_encode(
id);
142 auto invocation = spi::impl::ClientInvocation::create(
143 client_context_, close_message,
"", connection);
144 return invocation->invoke().then(
145 [
this, connection](boost::future<protocol::ClientMessage> f) {
149 rethrow(e, connection);
156sql_service::sql_execute_response_parameters
157sql_service::decode_execute_response(protocol::ClientMessage& msg)
159 static constexpr size_t RESPONSE_UPDATE_COUNT_FIELD_OFFSET =
160 protocol::ClientMessage::RESPONSE_HEADER_LEN;
161 static constexpr size_t RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET =
162 RESPONSE_UPDATE_COUNT_FIELD_OFFSET + protocol::ClientMessage::INT64_SIZE;
164 static constexpr size_t RESPONSE_PARTITION_ARGUMENT_INDEX_FIELD_OFFSET =
165 RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
166 protocol::ClientMessage::BOOL_SIZE;
168 sql_execute_response_parameters response;
170 auto initial_frame_header = msg.read_frame_header();
171 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN -
172 protocol::ClientMessage::SIZE_OF_FRAME_LENGTH_AND_FLAGS);
174 response.update_count = msg.get<int64_t>();
176 auto frame_len =
static_cast<int32_t
>(initial_frame_header.frame_len);
178 static_cast<int32_t
>(RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
179 protocol::ClientMessage::BOOL_SIZE)) {
180 response.is_infinite_rows = msg.get<
bool>();
181 response.is_infinite_rows_exist =
true;
183 uint32_t skip_frame_len =
184 static_cast<int32_t
>(RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
185 protocol::ClientMessage::BOOL_SIZE);
186 if (frame_len >=
static_cast<int32_t
>(
187 RESPONSE_PARTITION_ARGUMENT_INDEX_FIELD_OFFSET +
188 protocol::ClientMessage::INT32_SIZE)) {
189 response.partition_argument_index = msg.get<int32_t>();
190 response.is_partition_argument_index_exists =
true;
191 skip_frame_len += protocol::ClientMessage::INT32_SIZE;
193 response.is_partition_argument_index_exists =
false;
197 msg.rd_ptr(frame_len - skip_frame_len);
199 response.is_infinite_rows_exist =
false;
201 msg.rd_ptr(
static_cast<std::size_t
>(frame_len) -
202 RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET);
205 auto column_metadata = msg.get_nullable<std::vector<sql_column_metadata>>();
206 if (column_metadata) {
207 response.row_metadata = std::make_shared<sql_row_metadata>(
208 std::move(column_metadata.value()));
210 auto row_metadata = response.row_metadata;
211 auto page = msg.get_nullable<std::shared_ptr<sql::sql_page>>(
212 [row_metadata](protocol::ClientMessage& msg) {
213 return protocol::codec::builtin::sql_page_codec::decode(msg,
217 response.first_page = *std::move(page);
220 response.error = msg.get_nullable<impl::sql_error>();
225sql_service::sql_fetch_response_parameters
226sql_service::decode_fetch_response(protocol::ClientMessage message)
229 message.skip_frame();
231 auto page = message.get_nullable<std::shared_ptr<sql::sql_page>>(
232 [](protocol::ClientMessage& msg) {
233 return protocol::codec::builtin::sql_page_codec::decode(msg);
235 auto error = message.get<boost::optional<impl::sql_error>>();
236 return { std::move(page), std::move(error) };
239std::shared_ptr<sql_result>
240sql_service::handle_execute_response(
241 const std::string& sql_query,
242 const int32_t original_partition_argument_index,
243 protocol::ClientMessage& msg,
244 std::shared_ptr<connection::Connection> connection,
246 int32_t cursor_buffer_size,
247 std::weak_ptr<std::atomic<int32_t>> statement_par_arg_index_ptr)
249 auto response = decode_execute_response(msg);
250 if (response.error) {
251 BOOST_THROW_EXCEPTION(sql::hazelcast_sql_exception(
252 "sql_service::handle_execute_response",
253 std::move(response.error->originating_member_id),
254 response.error->code,
255 std::move(response.error->message),
256 std::move(response.error->suggestion)));
258 if (is_smart_routing_ && response.partition_argument_index !=
259 original_partition_argument_index) {
260 if (response.partition_argument_index != -1) {
261 partition_argument_index_cache_->put(
262 sql_query, response.partition_argument_index);
264 if (
auto argument_index = statement_par_arg_index_ptr.lock()) {
265 argument_index->store(response.partition_argument_index);
268 partition_argument_index_cache_->remove(sql_query);
273 return std::shared_ptr<sql_result>(
274 new sql_result(&client_context_,
276 std::move(connection),
278 response.update_count,
279 std::move(response.row_metadata),
280 std::move(response.first_page),
281 cursor_buffer_size));
284std::shared_ptr<connection::Connection>
285sql_service::query_connection()
287 std::shared_ptr<connection::Connection> connection;
289 auto& cs = client_context_.get_client_cluster_service();
291 client_context_.get_connection_manager().connection_for_sql(
293 return impl::query_utils::member_of_same_larger_version_group(
294 cs.get_member_list());
296 [&](boost::uuids::uuid id) {
return cs.get_member(
id); });
300 throw exception::query(
301 static_cast<int32_t
>(
302 impl::sql_error_code::CONNECTION_PROBLEM),
303 "Client is not connected");
304 }
catch (
const exception::query& e) {
309 }
catch (
const std::exception& e) {
316std::shared_ptr<connection::Connection>
317sql_service::query_connection(int32_t partition_id)
319 std::shared_ptr<connection::Connection> connection;
322 client_context_.get_partition_service().get_partition_owner(
325 if (node_id.is_nil()) {
326 return query_connection();
330 client_context_.get_connection_manager().get_connection(node_id);
332 if (connection ==
nullptr) {
333 return query_connection();
335 }
catch (
const std::exception& e) {
342boost::optional<int32_t>
343sql_service::extract_partition_id(
const sql_statement& statement,
344 int32_t arg_index)
const
346 if (!is_smart_routing_) {
350 if (statement.serialized_parameters_.size() == 0) {
355 static_cast<int32_t
>(statement.serialized_parameters_.size()) ||
360 const auto& key = statement.serialized_parameters_[arg_index];
362 return client_context_.get_partition_service().get_partition_id(key);
366sql_service::rethrow(
const std::exception& e)
370 std::rethrow_if_nested(e);
371 }
catch (exception::access_control&) {
374 impl::query_utils::throw_public_exception(std::current_exception(),
378 impl::query_utils::throw_public_exception(std::current_exception(),
383sql_service::rethrow(
const std::exception& cause,
384 const std::shared_ptr<connection::Connection>& connection)
386 if (!connection->is_alive()) {
387 auto msg = (boost::format(
"Cluster topology changed while a query was "
388 "executed: Member cannot be reached: %1%") %
389 connection->get_remote_address())
391 return impl::query_utils::throw_public_exception(
392 std::make_exception_ptr(exception::query(
393 static_cast<int32_t
>(impl::sql_error_code::CONNECTION_PROBLEM),
395 std::make_exception_ptr(cause))),
399 return rethrow(cause);
403sql_service::client_id()
405 return client_context_.get_connection_manager().get_client_uuid();
408boost::future<std::shared_ptr<sql_page>>
409sql_service::fetch_page(
410 const impl::query_id& q_id,
411 int32_t cursor_buffer_size,
412 const std::shared_ptr<connection::Connection>& connection)
414 auto request_message =
415 protocol::codec::sql_fetch_encode(q_id, cursor_buffer_size);
417 auto response_future = spi::impl::ClientInvocation::create(
418 client_context_, request_message,
"", connection)
421 return response_future.then(
422 boost::launch::sync, [
this](boost::future<protocol::ClientMessage> f) {
423 std::shared_ptr<sql_page> page;
425 auto response_message = f.get();
427 sql_fetch_response_parameters response_params =
428 this->decode_fetch_response(std::move(response_message));
430 hazelcast::client::sql::sql_service::handle_fetch_response_error(
431 std::move(response_params.error));
433 page = std::move(*response_params.page);
434 }
catch (exception::iexception&) {
435 impl::query_utils::throw_public_exception(
436 std::current_exception(), this->client_id());
444sql_service::handle_fetch_response_error(boost::optional<impl::sql_error> error)
447 throw hazelcast_sql_exception(
448 "sql_service::handle_fetch_response_error",
449 error->originating_member_id,
456constexpr std::chrono::milliseconds sql_statement::TIMEOUT_NOT_SET;
457constexpr std::chrono::milliseconds sql_statement::TIMEOUT_DISABLED;
458constexpr std::chrono::milliseconds sql_statement::DEFAULT_TIMEOUT;
459constexpr int32_t sql_statement::DEFAULT_CURSOR_BUFFER_SIZE;
462 : serialized_parameters_{}
465 , expected_result_type_{ sql_expected_result_type::any }
467 , partition_argument_index_{
std::make_shared<
std::atomic<int32_t>>(-1) }
468 , serialization_service_(
469 spi::ClientContext(client).get_serialization_service())
471 sql(std::move(query));
476 : serialized_parameters_{}
477 , cursor_buffer_size_{ DEFAULT_CURSOR_BUFFER_SIZE }
478 , timeout_{ TIMEOUT_NOT_SET }
479 , expected_result_type_{ sql_expected_result_type::any }
481 , partition_argument_index_{ nullptr }
482 , serialization_service_(client_context.get_serialization_service())
484 sql(std::move(query));
496 util::Preconditions::check_not_empty(sql_string,
"SQL cannot be empty");
498 sql_ = std::move(sql_string);
506 serialized_parameters_.clear();
514 return cursor_buffer_size_;
520 util::Preconditions::check_positive(
522 (boost::format(
"Cursor buffer size must be positive: %s") % size).str());
523 cursor_buffer_size_ = size;
528std::chrono::milliseconds
537 auto timeout_msecs =
timeout.count();
539 throw exception::illegal_argument(
540 "sql_statement::timeout(std::chrono::milliseconds timeout)",
541 (boost::format(
"Timeout must be non-negative or -1: %1% msecs") %
551const boost::optional<std::string>&
560 schema_ = std::move(
schema);
564sql::sql_expected_result_type
567 return expected_result_type_;
573 expected_result_type_ = type;
577std::shared_ptr<std::atomic<int32_t>>
580 return partition_argument_index_;
587 BOOST_THROW_EXCEPTION(client::exception::illegal_argument(
588 "The argument index must be >=0, or -1"));
594hazelcast_sql_exception::hazelcast_sql_exception(
596 boost::uuids::uuid originating_member_id,
598 boost::optional<std::string> message,
599 boost::optional<std::string> suggestion,
600 std::exception_ptr cause)
601 : hazelcast_(
std::move(source),
602 message ?
std::move(message).value() :
"",
605 , originating_member_id_(
std::move(originating_member_id))
607 , suggestion_(
std::move(suggestion))
611const boost::uuids::uuid&
614 return originating_member_id_;
623const boost::optional<std::string>&
632operator<<(std::ostream& os,
const query_id&
id)
634 os <<
"query_id{member_id: " << boost::uuids::to_string(
id.member_id)
635 <<
" local_id: " << boost::uuids::to_string(
id.local_id) <<
"}";
640query_utils::throw_public_exception(std::exception_ptr exc,
641 boost::uuids::uuid
id)
644 std::rethrow_exception(exc);
645 }
catch (hazelcast_sql_exception& e) {
647 }
catch (exception::query& e) {
648 auto originating_member_id = e.originating_member_uuid();
649 if (originating_member_id.is_nil()) {
650 originating_member_id = id;
653 throw hazelcast_sql_exception(
"query_utils::throw_public_exception",
654 originating_member_id,
659 }
catch (exception::iexception& ie) {
660 throw hazelcast_sql_exception(
661 "query_utils::throw_public_exception",
663 static_cast<int32_t
>(sql_error_code::GENERIC),
670boost::optional<member>
671query_utils::member_of_same_larger_version_group(
672 const std::vector<member>& members)
678 boost::optional<member::version> version0;
679 boost::optional<member::version> version1;
683 for (
const auto& m : members) {
684 if (m.is_lite_member()) {
687 auto v = m.get_version();
688 if (!version0 || *version0 == v) {
691 }
else if (!version1 || *version1 == v) {
695 throw exception::runtime(
696 "query_utils::member_of_same_larger_version_group",
698 "More than 2 distinct member versions found: %1% , %2%") %
704 assert(count1 == 0 || count0 > 0);
712 member::version version;
713 if (count0 > count1 || (count0 == count1 && *version0 > *version1)) {
722 static thread_local std::mt19937 generator;
723 std::uniform_int_distribution<int> distribution(0, count - 1);
724 auto random_member_index = distribution(generator);
725 for (
const auto& m : members) {
726 if (!m.is_lite_member() && m.get_version() == version) {
727 random_member_index--;
728 if (random_member_index < 0) {
734 throw exception::runtime(
"query_utils::member_of_same_larger_version_group",
735 "should never get here");
740sql_result::sql_result(spi::ClientContext* client_context,
742 std::shared_ptr<connection::Connection> connection,
744 int64_t update_count,
745 std::shared_ptr<sql_row_metadata> row_metadata,
746 std::shared_ptr<sql_page> first_page,
747 int32_t cursor_buffer_size)
748 : client_context_(client_context)
750 , connection_(std::move(connection))
752 , update_count_(update_count)
753 , row_metadata_(std::move(row_metadata))
754 , first_page_(std::move(first_page))
755 , iterator_requested_(false)
757 , cursor_buffer_size_(cursor_buffer_size)
761 first_page_->row_metadata(row_metadata_);
762 first_page_->serialization_service(
763 &client_context_->get_serialization_service());
773 return update_count_;
788 BOOST_THROW_EXCEPTION(exception::illegal_state(
789 "sql_result::iterator",
"This result contains only update count"));
792 if (iterator_requested_) {
793 BOOST_THROW_EXCEPTION(exception::illegal_state(
794 "sql_result::page_iterator",
"Iterator can be requested only once"));
797 iterator_requested_ =
true;
799 return { shared_from_this(), first_page_ };
803sql_result::pbegin(std::chrono::milliseconds timeout)
805 return page_iterator_sync{
iterator(), timeout };
808sql_result::page_iterator_sync
811 return page_iterator_sync{};
815sql_result::begin(std::chrono::milliseconds timeout)
827sql_result::check_closed()
const
830 impl::query_utils::throw_public_exception(
831 std::make_exception_ptr(exception::query(
832 static_cast<int32_t
>(impl::sql_error_code::CANCELLED_BY_USER),
833 "Query was cancelled by the user")),
834 service_->client_id());
838sql_result::row_iterator_sync::row_iterator_sync(page_iterator_sync&& iterator)
839 : iterator_{ std::move(iterator) }
847 iterator_.set_timeout(
timeout);
850std::chrono::milliseconds
853 return iterator_.timeout();
859 while (iterator_->rows().empty()) {
863 return iterator_->rows().at(row_idx_);
878 row_idx_ >= iterator_->rows().size()) {
890 return x.iterator_ == y.iterator_;
904 return boost::make_ready_future();
907 auto release_resources = [
this]() {
909 std::lock_guard<std::mutex> guard{ mtx_ };
915 row_metadata_.reset();
920 auto f = service_->close(connection_, query_id_);
925 }
catch (
const std::exception& e) {
928 service_->rethrow(e);
932 return boost::make_ready_future();
935boost::future<std::shared_ptr<sql_page>>
936sql_result::fetch_page()
938 std::lock_guard<std::mutex> guard{ mtx_ };
941 return service_->fetch_page(query_id_, cursor_buffer_size_, connection_);
944const sql_row_metadata&
947 if (!row_metadata_) {
948 throw exception::illegal_state(
949 "sql_result::row_metadata",
"This result contains only update count");
952 return *row_metadata_;
961 HZ_LOG(client_context_->get_logger(),
963 (boost::format(
"[sql_result::~sql_result()] Exception while "
964 "closing the query result. Query id: %1%") %
970sql_result::page_iterator::page_iterator(std::shared_ptr<sql_result> result,
971 std::shared_ptr<sql_page> first_page)
972 : in_progress_{
std::make_shared<
std::atomic<bool>>(false) }
973 , last_{
std::make_shared<
std::atomic<bool>>(false) }
974 , row_metadata_{ result->row_metadata_ }
975 , serialization_(&result->client_context_->get_serialization_service())
976 , result_{
std::move(result) }
977 , first_page_(
std::move(first_page))
981boost::future<std::shared_ptr<sql_page>>
984 result_->check_closed();
987 auto page = std::move(first_page_);
989 page->serialization_service(serialization_);
990 page->row_metadata(row_metadata_);
991 *last_ = page->last();
993 return boost::make_ready_future<std::shared_ptr<sql_page>>(page);
997 BOOST_THROW_EXCEPTION(
998 exception::illegal_access(
"sql_result::page_iterator::next",
999 "Fetch page operation is already in "
1000 "progress so next must not be called."));
1004 BOOST_THROW_EXCEPTION(exception::no_such_element(
1005 "sql_result::page_iterator::next",
1006 "Last page is already retrieved so there are no more pages."));
1009 *in_progress_ =
true;
1011 auto page_future = result_->fetch_page();
1013 std::weak_ptr<std::atomic<bool>> last_w{ last_ };
1014 std::weak_ptr<std::atomic<bool>> in_progress_w{ in_progress_ };
1015 std::shared_ptr<sql_row_metadata>
row_metadata{ row_metadata_ };
1016 auto result = result_;
1017 auto serialization_service = serialization_;
1019 return page_future.then(
1020 boost::launch::sync,
1021 [serialization_service,
row_metadata, last_w, in_progress_w, result](
1022 boost::future<std::shared_ptr<sql_page>> page_f) {
1024 auto page = page_f.get();
1026 result->check_closed();
1027 page->serialization_service(serialization_service);
1030 auto last = last_w.lock();
1033 *last = page->last();
1035 auto in_progress = in_progress_w.lock();
1038 *in_progress =
false;
1042 auto in_progress = in_progress_w.lock();
1045 *in_progress =
false;
1055 result_->check_closed();
1059sql_result::page_iterator_sync::non_copyables::non_copyables(
1062 , iter_{
std::move(iter) }
1066sql_result::page_iterator_sync::page_iterator_sync(
1067 page_iterator&& iter,
1068 std::chrono::milliseconds timeout)
1069 : block_(
std::make_shared<non_copyables>(
std::move(iter)))
1070 , current_{ block_->iter_.next().get() }
1071 , timeout_{ timeout }
1073 if (block_->iter_.has_next()) {
1074 block_->preloaded_page_ = block_->iter_.next();
1084std::chrono::milliseconds
1094 return !x.block_ && !y.block_;
1104sql_result::page_iterator_sync&
1108 BOOST_THROW_EXCEPTION(exception::no_such_element(
1109 "sql_result::page_iterator_sync::operator++()",
1110 "Iterator already points to past-end element."));
1111 }
else if (current_->last()) {
1115 if (!block_->preloaded_page_.is_ready() && timeout_.count() > 0) {
1116 (void)block_->preloaded_page_.wait_for(
1117 boost::chrono::milliseconds{ timeout_.count() });
1119 if (block_->preloaded_page_.is_ready()) {
1120 current_ = block_->preloaded_page_.get();
1122 BOOST_THROW_EXCEPTION(exception::no_such_element());
1125 current_ = block_->preloaded_page_.get();
1128 if (block_->iter_.has_next()) {
1129 block_->preloaded_page_ = block_->iter_.next();
1136std::shared_ptr<sql_page>
1140 BOOST_THROW_EXCEPTION(exception::no_such_element(
1141 "sql_result::page_iterator_sync::operator++()",
1142 "Iterator points to past-end element."));
1147std::shared_ptr<sql_page>
1154sql_page::page_data::column_count()
const
1156 return column_types_.size();
1160sql_page::page_data::row_count()
const
1162 return columns_[0].size();
1165sql_page::sql_row::sql_row(
size_t row_index, std::shared_ptr<page_data> shared)
1166 : row_index_(row_index)
1167 , page_data_(
std::move(shared))
1172sql_page::sql_row::resolve_index(
const std::string& column_name)
const
1174 auto it = page_data_->row_metadata_->find_column(column_name);
1175 if (it == page_data_->row_metadata_->end()) {
1176 throw exception::illegal_argument(
1177 "sql_page::get_object(const std::string &)",
1178 (boost::format(
"Column %1% doesn't exist") % column_name).str());
1180 auto column_index = it->second;
1181 return column_index;
1184const sql_row_metadata&
1187 return *page_data_->row_metadata_;
1191sql_page::sql_row::check_index(
size_t index)
const
1194 throw exception::index_out_of_bounds(
1195 "sql_page::sql_row::check_index",
1196 (boost::format(
"Column index is out of range: %1%") % index).str());
1201 std::vector<column> columns,
1203 std::shared_ptr<sql_row_metadata> row_metadata)
1206 std::move(row_metadata),
1213sql_page::construct_rows()
1217 for (
size_t i = 0; i < count; ++i) {
1218 rows_.emplace_back(i, page_data_);
1222const std::vector<sql_column_type>&
1225 return page_data_->column_types_;
1237 return page_data_->column_count();
1243 return page_data_->row_count();
1246const std::vector<sql_page::sql_row>&
1253sql_page::row_metadata(std::shared_ptr<sql_row_metadata> row_meta)
1255 page_data_->row_metadata_ = std::move(row_meta);
1259sql_page::serialization_service(serialization::pimpl::SerializationService* ss)
1261 page_data_->serialization_service_ = ss;
1264sql_row_metadata::sql_row_metadata(std::vector<sql_column_metadata> columns)
1265 : columns_(
std::move(columns))
1267 assert(!columns_.empty());
1269 name_to_index_.reserve(columns_.size());
1270 for (std::size_t i = 0; i < columns_.size(); ++i) {
1271 name_to_index_.emplace(columns_[i].name, i);
1278 return columns_.size();
1284 if (index >= columns_.size()) {
1285 throw exception::index_out_of_bounds(
1286 "sql_row_metadata::column(std::size_t index)",
1287 (boost::format(
"Column index is out of bounds: %1%") % index).str());
1290 return columns_[index];
1293const std::vector<sql_column_metadata>&
1302 return name_to_index_.find(column_name);
1308 return name_to_index_.end();
1313 return lhs.columns_ == rhs.columns_;
1317operator==(
const sql_column_metadata& lhs,
const sql_column_metadata& rhs)
1319 return lhs.name == rhs.name && lhs.type == rhs.type &&
1320 lhs.nullable == rhs.nullable;
Base class for all exception originated from Hazelcast methods.
int32_t code() const
Gets the internal error code associated with the exception.
const boost::uuids::uuid & originating_member_id() const
Gets ID of the member that caused or initiated an error condition.
const boost::optional< std::string > & suggestion() const
Gets the suggested SQL statement to remediate experienced error.
const sql_row_metadata & row_metadata() const
Gets the row metadata.
std::size_t row_count() const
Returns the number of rows in this page.
sql_page(std::vector< sql_column_type > column_types, std::vector< column > columns, bool last, std::shared_ptr< sql_row_metadata > row_metadata=nullptr)
Constructs an sql_page from the response returned from the server.
std::size_t column_count() const
Returns the number of columns in each row.
const std::vector< sql_row > & rows() const
Returns the rows of this page.
const std::vector< sql_column_type > & column_types() const
Returns the types of the columns in each row.
Copy is allowed for convenience but it does shallow copy so it should be avoided.
std::shared_ptr< sql_page > operator*() const
Dereferences current page.
std::chrono::milliseconds timeout() const
Retrieves the timeout.
page_iterator_sync & operator++()
Fetches next page with blocking manner.
void set_timeout(std::chrono::milliseconds)
Sets timeout for page fetch operation.
std::shared_ptr< sql_page > operator->() const
Dereferences current page.
Copy is allowed for convenience but it does shallow copy so it should be avoided.
bool has_next() const
Tells whether there are pages to be retrieved.
boost::future< std::shared_ptr< sql_page > > next()
Fetches the new page.
Copy is allowed for convenience but it does shallow copy so it should be avoided.
void set_timeout(std::chrono::milliseconds)
Sets timeout for page fetch operation.
row_iterator_sync & operator++()
Fetches next row in blocking manner.
const sql_page::sql_row & operator*() const
Returns current row.
const sql_page::sql_row * operator->() const
Returns current row.
std::chrono::milliseconds timeout() const
Retrieves the timeout.
bool row_set() const
Return whether this result has rows to iterate using the iterator() method.
const sql_row_metadata & row_metadata() const
Gets the row metadata.
page_iterator iterator()
Returns an iterator over the result pages.
virtual ~sql_result()
The destructor closes the result if it were open.
int64_t update_count() const
Returns the number of rows updated by the statement or -1 if this result is a row set.
boost::future< void > close()
Release the resources associated with the query result.
A service to execute SQL statements.
boost::future< std::shared_ptr< sql_result > > execute(const std::string &query, const Params &... params)
Convenient method to execute a distributed query with the given parameter values.
Definition of an SQL statement.
std::shared_ptr< std::atomic< int32_t > > partition_argument_index() const
Get the partition argument index value.
static constexpr int32_t DEFAULT_CURSOR_BUFFER_SIZE
Default cursor buffer size.
const boost::optional< std::string > & schema() const
Gets the schema name.
std::chrono::milliseconds timeout() const
Gets the execution timeout in milliseconds.
sql_statement & clear_parameters()
Clears statement parameter values.
sql_statement(hazelcast_client &client, std::string query)
Creates a statement with the given query.
int32_t cursor_buffer_size() const
Gets the cursor buffer size (measured in the number of rows).
const std::string & sql() const
static constexpr std::chrono::milliseconds TIMEOUT_NOT_SET
Value for the timeout that is not set.
sql_expected_result_type expected_result_type() const
Gets the expected result type.