Hazelcast C++ Client
Hazelcast C++ Client Library
Loading...
Searching...
No Matches
sql.cpp
1/*
2 * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <boost/uuid/random_generator.hpp>
18
19#include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
20#include "hazelcast/client/protocol/codec/codecs.h"
21#include "hazelcast/client/protocol/codec/builtin/sql_page_codec.h"
22#include "hazelcast/client/spi/ClientContext.h"
23#include "hazelcast/client/spi/impl/ClientInvocation.h"
24#include "hazelcast/client/sql/impl/query_id.h"
25#include "hazelcast/client/sql/sql_column_metadata.h"
26#include "hazelcast/client/sql/sql_column_type.h"
27#include "hazelcast/client/sql/sql_result.h"
28#include "hazelcast/client/sql/sql_statement.h"
29#include "hazelcast/client/sql/hazelcast_sql_exception.h"
30#include "hazelcast/client/sql/sql_service.h"
31#include "hazelcast/client/hazelcast_client.h"
32#include "hazelcast/client/sql/impl/query_utils.h"
33#include "hazelcast/client/sql/impl/sql_error_code.h"
34#include "hazelcast/client/sql/sql_row_metadata.h"
35#include "hazelcast/client/sql/sql_page.h"
36
37namespace hazelcast {
38namespace client {
39namespace sql {
40
41sql_service::sql_service(client::spi::ClientContext& context)
42 : client_context_(context)
43{
44 is_smart_routing_ = client_context_.get_client_config()
45 .get_network_config()
46 .is_smart_routing();
47
48 const auto& client_properties = client_context_.get_client_properties();
49 const int32_t partition_arg_cache_size = client_properties.get_integer(
50 client_properties.partition_arg_cache_size());
51 const int32_t partition_arg_cache_threshold =
52 partition_arg_cache_size + std::min(partition_arg_cache_size / 10, 50);
53 partition_argument_index_cache_ =
54 std::make_shared<impl::read_optimized_lru_cache<std::string, int32_t>>(
55 partition_arg_cache_size, partition_arg_cache_threshold);
56}
57
58sql::impl::query_id
59sql_service::create_query_id(
60 const std::shared_ptr<connection::Connection>& query_conn)
61{
62 auto local_id = client_context_.random_uuid();
63 auto member_id = query_conn->get_remote_uuid();
64
65 return { member_id, local_id };
66}
67
68boost::future<std::shared_ptr<sql_result>>
70{
71 using protocol::ClientMessage;
72
73 auto statement_par_arg_index_ptr = statement.partition_argument_index();
74 int32_t statement_par_arg_index = statement_par_arg_index_ptr != nullptr
75 ? statement_par_arg_index_ptr->load()
76 : -1;
77
78 auto arg_index = statement_par_arg_index != -1
79 ? statement_par_arg_index
80 : (partition_argument_index_cache_->get_or_default(
81 statement.sql(), -1));
82
83 auto partition_id = extract_partition_id(statement, arg_index);
84 std::shared_ptr<connection::Connection> query_conn =
85 partition_id ? query_connection(partition_id.value())
86 : query_connection();
87
88 sql::impl::query_id qid = create_query_id(query_conn);
89
90 auto request = protocol::codec::sql_execute_encode(
91 statement.sql(),
92 statement.serialized_parameters_,
93 static_cast<int64_t>(statement.timeout().count()),
94 static_cast<int32_t>(statement.cursor_buffer_size()),
95 statement.schema() ? &statement.schema().value() : nullptr,
96 static_cast<byte>(statement.expected_result_type()),
97 qid,
98 false);
99
100 auto invocation = spi::impl::ClientInvocation::create(
101 client_context_, request, "", query_conn);
102
103 auto cursor_buffer_size = statement.cursor_buffer_size();
104
105 std::weak_ptr<std::atomic<int32_t>> statement_par_arg_index_weak_ptr =
106 statement_par_arg_index_ptr;
107
108 auto sql_query = statement.sql();
109 return invocation->invoke().then(
110 boost::launch::sync,
111 [this,
112 query_conn,
113 qid,
114 cursor_buffer_size,
115 sql_query,
116 arg_index,
117 statement_par_arg_index_weak_ptr](
118 boost::future<ClientMessage> response_fut) {
119 try {
120 auto response = response_fut.get();
121 return handle_execute_response(sql_query,
122 arg_index,
123 response,
124 query_conn,
125 qid,
126 cursor_buffer_size,
127 statement_par_arg_index_weak_ptr);
128 } catch (const std::exception& e) {
129 rethrow(e, query_conn);
130 }
131 assert(0);
132 return std::shared_ptr<sql_result>();
133 });
134}
135
136boost::future<void>
137sql_service::close(const std::shared_ptr<connection::Connection>& connection,
138 impl::query_id id)
139{
140 auto close_message = protocol::codec::sql_close_encode(id);
141
142 auto invocation = spi::impl::ClientInvocation::create(
143 client_context_, close_message, "", connection);
144 return invocation->invoke().then(
145 [this, connection](boost::future<protocol::ClientMessage> f) {
146 try {
147 f.get();
148 } catch (const exception::iexception& e) {
149 rethrow(e, connection);
150 }
151
152 return;
153 });
154}
155
156sql_service::sql_execute_response_parameters
157sql_service::decode_execute_response(protocol::ClientMessage& msg)
158{
159 static constexpr size_t RESPONSE_UPDATE_COUNT_FIELD_OFFSET =
160 protocol::ClientMessage::RESPONSE_HEADER_LEN;
161 static constexpr size_t RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET =
162 RESPONSE_UPDATE_COUNT_FIELD_OFFSET + protocol::ClientMessage::INT64_SIZE;
163
164 static constexpr size_t RESPONSE_PARTITION_ARGUMENT_INDEX_FIELD_OFFSET =
165 RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
166 protocol::ClientMessage::BOOL_SIZE;
167
168 sql_execute_response_parameters response;
169
170 auto initial_frame_header = msg.read_frame_header();
171 msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN -
172 protocol::ClientMessage::SIZE_OF_FRAME_LENGTH_AND_FLAGS);
173
174 response.update_count = msg.get<int64_t>();
175
176 auto frame_len = static_cast<int32_t>(initial_frame_header.frame_len);
177 if (frame_len >=
178 static_cast<int32_t>(RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
179 protocol::ClientMessage::BOOL_SIZE)) {
180 response.is_infinite_rows = msg.get<bool>();
181 response.is_infinite_rows_exist = true;
182
183 uint32_t skip_frame_len =
184 static_cast<int32_t>(RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
185 protocol::ClientMessage::BOOL_SIZE);
186 if (frame_len >= static_cast<int32_t>(
187 RESPONSE_PARTITION_ARGUMENT_INDEX_FIELD_OFFSET +
188 protocol::ClientMessage::INT32_SIZE)) {
189 response.partition_argument_index = msg.get<int32_t>();
190 response.is_partition_argument_index_exists = true;
191 skip_frame_len += protocol::ClientMessage::INT32_SIZE;
192 } else {
193 response.is_partition_argument_index_exists = false;
194 }
195
196 // skip initial_frame
197 msg.rd_ptr(frame_len - skip_frame_len);
198 } else {
199 response.is_infinite_rows_exist = false;
200 // skip initial_frame
201 msg.rd_ptr(static_cast<std::size_t>(frame_len) -
202 RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET);
203 }
204
205 auto column_metadata = msg.get_nullable<std::vector<sql_column_metadata>>();
206 if (column_metadata) {
207 response.row_metadata = std::make_shared<sql_row_metadata>(
208 std::move(column_metadata.value()));
209 }
210 auto row_metadata = response.row_metadata;
211 auto page = msg.get_nullable<std::shared_ptr<sql::sql_page>>(
212 [row_metadata](protocol::ClientMessage& msg) {
213 return protocol::codec::builtin::sql_page_codec::decode(msg,
214 row_metadata);
215 });
216 if (page) {
217 response.first_page = *std::move(page);
218 }
219
220 response.error = msg.get_nullable<impl::sql_error>();
221
222 return response;
223}
224
225sql_service::sql_fetch_response_parameters
226sql_service::decode_fetch_response(protocol::ClientMessage message)
227{
228 // empty initial frame
229 message.skip_frame();
230
231 auto page = message.get_nullable<std::shared_ptr<sql::sql_page>>(
232 [](protocol::ClientMessage& msg) {
233 return protocol::codec::builtin::sql_page_codec::decode(msg);
234 });
235 auto error = message.get<boost::optional<impl::sql_error>>();
236 return { std::move(page), std::move(error) };
237}
238
239std::shared_ptr<sql_result>
240sql_service::handle_execute_response(
241 const std::string& sql_query,
242 const int32_t original_partition_argument_index,
243 protocol::ClientMessage& msg,
244 std::shared_ptr<connection::Connection> connection,
245 impl::query_id id,
246 int32_t cursor_buffer_size,
247 std::weak_ptr<std::atomic<int32_t>> statement_par_arg_index_ptr)
248{
249 auto response = decode_execute_response(msg);
250 if (response.error) {
251 BOOST_THROW_EXCEPTION(sql::hazelcast_sql_exception(
252 "sql_service::handle_execute_response",
253 std::move(response.error->originating_member_id),
254 response.error->code,
255 std::move(response.error->message),
256 std::move(response.error->suggestion)));
257 } else {
258 if (is_smart_routing_ && response.partition_argument_index !=
259 original_partition_argument_index) {
260 if (response.partition_argument_index != -1) {
261 partition_argument_index_cache_->put(
262 sql_query, response.partition_argument_index);
263
264 if (auto argument_index = statement_par_arg_index_ptr.lock()) {
265 argument_index->store(response.partition_argument_index);
266 }
267 } else {
268 partition_argument_index_cache_->remove(sql_query);
269 }
270 }
271 }
272
273 return std::shared_ptr<sql_result>(
274 new sql_result(&client_context_,
275 this,
276 std::move(connection),
277 id,
278 response.update_count,
279 std::move(response.row_metadata),
280 std::move(response.first_page),
281 cursor_buffer_size));
282}
283
284std::shared_ptr<connection::Connection>
285sql_service::query_connection()
286{
287 std::shared_ptr<connection::Connection> connection;
288 try {
289 auto& cs = client_context_.get_client_cluster_service();
290 connection =
291 client_context_.get_connection_manager().connection_for_sql(
292 [&]() {
293 return impl::query_utils::member_of_same_larger_version_group(
294 cs.get_member_list());
295 },
296 [&](boost::uuids::uuid id) { return cs.get_member(id); });
297
298 if (!connection) {
299 try {
300 throw exception::query(
301 static_cast<int32_t>(
302 impl::sql_error_code::CONNECTION_PROBLEM),
303 "Client is not connected");
304 } catch (const exception::query& e) {
305 rethrow(e);
306 }
307 }
308
309 } catch (const std::exception& e) {
310 rethrow(e);
311 }
312
313 return connection;
314}
315
316std::shared_ptr<connection::Connection>
317sql_service::query_connection(int32_t partition_id)
318{
319 std::shared_ptr<connection::Connection> connection;
320 try {
321 const auto node_id =
322 client_context_.get_partition_service().get_partition_owner(
323 partition_id);
324
325 if (node_id.is_nil()) {
326 return query_connection();
327 }
328
329 connection =
330 client_context_.get_connection_manager().get_connection(node_id);
331
332 if (connection == nullptr) {
333 return query_connection();
334 }
335 } catch (const std::exception& e) {
336 rethrow(e);
337 }
338
339 return connection;
340}
341
342boost::optional<int32_t>
343sql_service::extract_partition_id(const sql_statement& statement,
344 int32_t arg_index) const
345{
346 if (!is_smart_routing_) {
347 return boost::none;
348 }
349
350 if (statement.serialized_parameters_.size() == 0) {
351 return boost::none;
352 }
353
354 if (arg_index >=
355 static_cast<int32_t>(statement.serialized_parameters_.size()) ||
356 arg_index < 0) {
357 return boost::none;
358 }
359
360 const auto& key = statement.serialized_parameters_[arg_index];
361
362 return client_context_.get_partition_service().get_partition_id(key);
363}
364
365void
366sql_service::rethrow(const std::exception& e)
367{
368 // Make sure that access_control is thrown as a top-level exception
369 try {
370 std::rethrow_if_nested(e);
371 } catch (exception::access_control&) {
372 throw;
373 } catch (...) {
374 impl::query_utils::throw_public_exception(std::current_exception(),
375 client_id());
376 }
377
378 impl::query_utils::throw_public_exception(std::current_exception(),
379 client_id());
380}
381
382void
383sql_service::rethrow(const std::exception& cause,
384 const std::shared_ptr<connection::Connection>& connection)
385{
386 if (!connection->is_alive()) {
387 auto msg = (boost::format("Cluster topology changed while a query was "
388 "executed: Member cannot be reached: %1%") %
389 connection->get_remote_address())
390 .str();
391 return impl::query_utils::throw_public_exception(
392 std::make_exception_ptr(exception::query(
393 static_cast<int32_t>(impl::sql_error_code::CONNECTION_PROBLEM),
394 msg,
395 std::make_exception_ptr(cause))),
396 client_id());
397 }
398
399 return rethrow(cause);
400}
401
402boost::uuids::uuid
403sql_service::client_id()
404{
405 return client_context_.get_connection_manager().get_client_uuid();
406}
407
408boost::future<std::shared_ptr<sql_page>>
409sql_service::fetch_page(
410 const impl::query_id& q_id,
411 int32_t cursor_buffer_size,
412 const std::shared_ptr<connection::Connection>& connection)
413{
414 auto request_message =
415 protocol::codec::sql_fetch_encode(q_id, cursor_buffer_size);
416
417 auto response_future = spi::impl::ClientInvocation::create(
418 client_context_, request_message, "", connection)
419 ->invoke();
420
421 return response_future.then(
422 boost::launch::sync, [this](boost::future<protocol::ClientMessage> f) {
423 std::shared_ptr<sql_page> page;
424 try {
425 auto response_message = f.get();
426
427 sql_fetch_response_parameters response_params =
428 this->decode_fetch_response(std::move(response_message));
429
430 hazelcast::client::sql::sql_service::handle_fetch_response_error(
431 std::move(response_params.error));
432
433 page = std::move(*response_params.page);
434 } catch (exception::iexception&) {
435 impl::query_utils::throw_public_exception(
436 std::current_exception(), this->client_id());
437 }
438
439 return page;
440 });
441}
442
443void
444sql_service::handle_fetch_response_error(boost::optional<impl::sql_error> error)
445{
446 if (error) {
447 throw hazelcast_sql_exception(
448 "sql_service::handle_fetch_response_error",
449 error->originating_member_id,
450 error->code,
451 error->message,
452 error->suggestion);
453 }
454}
455
456constexpr std::chrono::milliseconds sql_statement::TIMEOUT_NOT_SET;
457constexpr std::chrono::milliseconds sql_statement::TIMEOUT_DISABLED;
458constexpr std::chrono::milliseconds sql_statement::DEFAULT_TIMEOUT;
459constexpr int32_t sql_statement::DEFAULT_CURSOR_BUFFER_SIZE;
460
462 : serialized_parameters_{}
463 , cursor_buffer_size_{ DEFAULT_CURSOR_BUFFER_SIZE }
464 , timeout_{ TIMEOUT_NOT_SET }
465 , expected_result_type_{ sql_expected_result_type::any }
466 , schema_{}
467 , partition_argument_index_{ std::make_shared<std::atomic<int32_t>>(-1) }
468 , serialization_service_(
469 spi::ClientContext(client).get_serialization_service())
470{
471 sql(std::move(query));
472}
473
474sql_statement::sql_statement(spi::ClientContext& client_context,
475 std::string query)
476 : serialized_parameters_{}
477 , cursor_buffer_size_{ DEFAULT_CURSOR_BUFFER_SIZE }
478 , timeout_{ TIMEOUT_NOT_SET }
479 , expected_result_type_{ sql_expected_result_type::any }
480 , schema_{}
481 , partition_argument_index_{ nullptr }
482 , serialization_service_(client_context.get_serialization_service())
483{
484 sql(std::move(query));
485}
486
487const std::string&
489{
490 return sql_;
491}
492
494sql_statement::sql(std::string sql_string)
495{
496 util::Preconditions::check_not_empty(sql_string, "SQL cannot be empty");
497
498 sql_ = std::move(sql_string);
499
500 return *this;
501}
502
505{
506 serialized_parameters_.clear();
507
508 return *this;
509}
510
511int32_t
513{
514 return cursor_buffer_size_;
515}
516
519{
520 util::Preconditions::check_positive(
521 size,
522 (boost::format("Cursor buffer size must be positive: %s") % size).str());
523 cursor_buffer_size_ = size;
524
525 return *this;
526}
527
528std::chrono::milliseconds
530{
531 return timeout_;
532}
533
535sql_statement::timeout(std::chrono::milliseconds timeout)
536{
537 auto timeout_msecs = timeout.count();
538 if (timeout_msecs < 0 && timeout != TIMEOUT_NOT_SET) {
539 throw exception::illegal_argument(
540 "sql_statement::timeout(std::chrono::milliseconds timeout)",
541 (boost::format("Timeout must be non-negative or -1: %1% msecs") %
542 timeout_msecs)
543 .str());
544 }
545
546 timeout_ = timeout;
547
548 return *this;
549}
550
551const boost::optional<std::string>&
553{
554 return schema_;
555}
556
558sql_statement::schema(boost::optional<std::string> schema)
559{
560 schema_ = std::move(schema);
561 return *this;
562}
563
564sql::sql_expected_result_type
566{
567 return expected_result_type_;
568}
569
571sql_statement::expected_result_type(sql::sql_expected_result_type type)
572{
573 expected_result_type_ = type;
574 return *this;
575}
576
577std::shared_ptr<std::atomic<int32_t>>
579{
580 return partition_argument_index_;
581}
582
584sql_statement::partition_argument_index(int32_t partition_argument_index)
585{
586 if (partition_argument_index < -1) {
587 BOOST_THROW_EXCEPTION(client::exception::illegal_argument(
588 "The argument index must be >=0, or -1"));
589 }
590 *partition_argument_index_ = partition_argument_index;
591 return *this;
592}
593
594hazelcast_sql_exception::hazelcast_sql_exception(
595 std::string source,
596 boost::uuids::uuid originating_member_id,
597 int32_t code,
598 boost::optional<std::string> message,
599 boost::optional<std::string> suggestion,
600 std::exception_ptr cause)
601 : hazelcast_(std::move(source),
602 message ? std::move(message).value() : "",
603 "",
604 std::move(cause))
605 , originating_member_id_(std::move(originating_member_id))
606 , code_(code)
607 , suggestion_(std::move(suggestion))
608{
609}
610
611const boost::uuids::uuid&
613{
614 return originating_member_id_;
615}
616
617int32_t
619{
620 return code_;
621}
622
623const boost::optional<std::string>&
625{
626 return suggestion_;
627}
628
629namespace impl {
630
631std::ostream&
632operator<<(std::ostream& os, const query_id& id)
633{
634 os << "query_id{member_id: " << boost::uuids::to_string(id.member_id)
635 << " local_id: " << boost::uuids::to_string(id.local_id) << "}";
636 return os;
637}
638
639void
640query_utils::throw_public_exception(std::exception_ptr exc,
641 boost::uuids::uuid id)
642{
643 try {
644 std::rethrow_exception(exc);
645 } catch (hazelcast_sql_exception& e) {
646 throw;
647 } catch (exception::query& e) {
648 auto originating_member_id = e.originating_member_uuid();
649 if (originating_member_id.is_nil()) {
650 originating_member_id = id;
651 }
652
653 throw hazelcast_sql_exception("query_utils::throw_public_exception",
654 originating_member_id,
655 e.code(),
656 e.get_message(),
657 e.suggestion(),
658 exc);
659 } catch (exception::iexception& ie) {
660 throw hazelcast_sql_exception(
661 "query_utils::throw_public_exception",
662 id,
663 static_cast<int32_t>(sql_error_code::GENERIC),
664 ie.get_message(),
665 boost::none,
666 exc);
667 }
668}
669
670boost::optional<member>
671query_utils::member_of_same_larger_version_group(
672 const std::vector<member>& members)
673{
674 // The members should have at most 2 different version (ignoring the patch
675 // version). Find a random member from the larger same-version group.
676
677 // we don't use 2-element array to save on litter
678 boost::optional<member::version> version0;
679 boost::optional<member::version> version1;
680 size_t count0 = 0;
681 size_t count1 = 0;
682
683 for (const auto& m : members) {
684 if (m.is_lite_member()) {
685 continue;
686 }
687 auto v = m.get_version();
688 if (!version0 || *version0 == v) {
689 version0 = v;
690 ++count0;
691 } else if (!version1 || *version1 == v) {
692 version1 = v;
693 ++count1;
694 } else {
695 throw exception::runtime(
696 "query_utils::member_of_same_larger_version_group",
697 (boost::format(
698 "More than 2 distinct member versions found: %1% , %2%") %
699 version0 % version1)
700 .str());
701 }
702 }
703
704 assert(count1 == 0 || count0 > 0);
705
706 // no data members
707 if (count0 == 0) {
708 return boost::none;
709 }
710
711 size_t count;
712 member::version version;
713 if (count0 > count1 || (count0 == count1 && *version0 > *version1)) {
714 count = count0;
715 version = *version0;
716 } else {
717 count = count1;
718 version = *version1;
719 }
720
721 // otherwise return a random member from the larger group
722 static thread_local std::mt19937 generator;
723 std::uniform_int_distribution<int> distribution(0, count - 1);
724 auto random_member_index = distribution(generator);
725 for (const auto& m : members) {
726 if (!m.is_lite_member() && m.get_version() == version) {
727 random_member_index--;
728 if (random_member_index < 0) {
729 return m;
730 }
731 }
732 }
733
734 throw exception::runtime("query_utils::member_of_same_larger_version_group",
735 "should never get here");
736}
737
738} // namespace impl
739
740sql_result::sql_result(spi::ClientContext* client_context,
741 sql_service* service,
742 std::shared_ptr<connection::Connection> connection,
743 impl::query_id id,
744 int64_t update_count,
745 std::shared_ptr<sql_row_metadata> row_metadata,
746 std::shared_ptr<sql_page> first_page,
747 int32_t cursor_buffer_size)
748 : client_context_(client_context)
749 , service_(service)
750 , connection_(std::move(connection))
751 , query_id_(id)
752 , update_count_(update_count)
753 , row_metadata_(std::move(row_metadata))
754 , first_page_(std::move(first_page))
755 , iterator_requested_(false)
756 , closed_(false)
757 , cursor_buffer_size_(cursor_buffer_size)
758{
759 if (row_metadata_) {
760 assert(first_page_);
761 first_page_->row_metadata(row_metadata_);
762 first_page_->serialization_service(
763 &client_context_->get_serialization_service());
764 update_count_ = -1;
765 } else {
766 closed_ = true;
767 }
768}
769
770int64_t
772{
773 return update_count_;
774}
775
776bool
778{
779 return update_count() == -1;
780}
781
784{
785 check_closed();
786
787 if (!first_page_) {
788 BOOST_THROW_EXCEPTION(exception::illegal_state(
789 "sql_result::iterator", "This result contains only update count"));
790 }
791
792 if (iterator_requested_) {
793 BOOST_THROW_EXCEPTION(exception::illegal_state(
794 "sql_result::page_iterator", "Iterator can be requested only once"));
795 }
796
797 iterator_requested_ = true;
798
799 return { shared_from_this(), first_page_ };
800}
801
803sql_result::pbegin(std::chrono::milliseconds timeout)
804{
805 return page_iterator_sync{ iterator(), timeout };
806}
807
808sql_result::page_iterator_sync
809sql_result::pend()
810{
811 return page_iterator_sync{};
812}
813
815sql_result::begin(std::chrono::milliseconds timeout)
816{
817 return row_iterator_sync{ pbegin(timeout) };
818}
819
821sql_result::end()
822{
823 return row_iterator_sync{};
824}
825
826void
827sql_result::check_closed() const
828{
829 if (closed_) {
830 impl::query_utils::throw_public_exception(
831 std::make_exception_ptr(exception::query(
832 static_cast<int32_t>(impl::sql_error_code::CANCELLED_BY_USER),
833 "Query was cancelled by the user")),
834 service_->client_id());
835 }
836}
837
838sql_result::row_iterator_sync::row_iterator_sync(page_iterator_sync&& iterator)
839 : iterator_{ std::move(iterator) }
840 , row_idx_{}
841{
842}
843
844void
846{
847 iterator_.set_timeout(timeout);
848}
849
850std::chrono::milliseconds
852{
853 return iterator_.timeout();
854}
855
858{
859 while (iterator_->rows().empty()) {
860 ++iterator_;
861 }
862
863 return iterator_->rows().at(row_idx_);
864}
865
868{
869 return &(operator*());
870}
871
874{
875 ++row_idx_;
876
877 while (iterator_ != page_iterator_sync{} &&
878 row_idx_ >= iterator_->rows().size()) {
879 ++iterator_;
880 row_idx_ = 0;
881 }
882
883 return *this;
884}
885
886bool
887operator==(const sql_result::row_iterator_sync& x,
889{
890 return x.iterator_ == y.iterator_;
891}
892
893bool
894operator!=(const sql_result::row_iterator_sync& x,
896{
897 return !(x == y);
898}
899
900boost::future<void>
902{
903 if (closed_) {
904 return boost::make_ready_future();
905 }
906
907 auto release_resources = [this]() {
908 {
909 std::lock_guard<std::mutex> guard{ mtx_ };
910 closed_ = true;
911
912 connection_.reset();
913 }
914
915 row_metadata_.reset();
916 first_page_.reset();
917 };
918
919 try {
920 auto f = service_->close(connection_, query_id_);
921
922 release_resources();
923
924 return f;
925 } catch (const std::exception& e) {
926 release_resources();
927
928 service_->rethrow(e);
929 }
930
931 // This should not be reached.
932 return boost::make_ready_future();
933}
934
935boost::future<std::shared_ptr<sql_page>>
936sql_result::fetch_page()
937{
938 std::lock_guard<std::mutex> guard{ mtx_ };
939
940 check_closed();
941 return service_->fetch_page(query_id_, cursor_buffer_size_, connection_);
942}
943
944const sql_row_metadata&
946{
947 if (!row_metadata_) {
948 throw exception::illegal_state(
949 "sql_result::row_metadata", "This result contains only update count");
950 }
951
952 return *row_metadata_;
953}
954
956{
957 try {
958 close().get();
959 } catch (...) {
960 // ignore
961 HZ_LOG(client_context_->get_logger(),
962 info,
963 (boost::format("[sql_result::~sql_result()] Exception while "
964 "closing the query result. Query id: %1%") %
965 query_id_)
966 .str());
967 }
968}
969
970sql_result::page_iterator::page_iterator(std::shared_ptr<sql_result> result,
971 std::shared_ptr<sql_page> first_page)
972 : in_progress_{ std::make_shared<std::atomic<bool>>(false) }
973 , last_{ std::make_shared<std::atomic<bool>>(false) }
974 , row_metadata_{ result->row_metadata_ }
975 , serialization_(&result->client_context_->get_serialization_service())
976 , result_{ std::move(result) }
977 , first_page_(std::move(first_page))
978{
979}
980
981boost::future<std::shared_ptr<sql_page>>
983{
984 result_->check_closed();
985
986 if (first_page_) {
987 auto page = std::move(first_page_);
988
989 page->serialization_service(serialization_);
990 page->row_metadata(row_metadata_);
991 *last_ = page->last();
992
993 return boost::make_ready_future<std::shared_ptr<sql_page>>(page);
994 }
995
996 if (*in_progress_) {
997 BOOST_THROW_EXCEPTION(
998 exception::illegal_access("sql_result::page_iterator::next",
999 "Fetch page operation is already in "
1000 "progress so next must not be called."));
1001 }
1002
1003 if (*last_) {
1004 BOOST_THROW_EXCEPTION(exception::no_such_element(
1005 "sql_result::page_iterator::next",
1006 "Last page is already retrieved so there are no more pages."));
1007 }
1008
1009 *in_progress_ = true;
1010
1011 auto page_future = result_->fetch_page();
1012
1013 std::weak_ptr<std::atomic<bool>> last_w{ last_ };
1014 std::weak_ptr<std::atomic<bool>> in_progress_w{ in_progress_ };
1015 std::shared_ptr<sql_row_metadata> row_metadata{ row_metadata_ };
1016 auto result = result_;
1017 auto serialization_service = serialization_;
1018
1019 return page_future.then(
1020 boost::launch::sync,
1021 [serialization_service, row_metadata, last_w, in_progress_w, result](
1022 boost::future<std::shared_ptr<sql_page>> page_f) {
1023 try {
1024 auto page = page_f.get();
1025
1026 result->check_closed();
1027 page->serialization_service(serialization_service);
1028 page->row_metadata(std::move(row_metadata));
1029
1030 auto last = last_w.lock();
1031
1032 if (last)
1033 *last = page->last();
1034
1035 auto in_progress = in_progress_w.lock();
1036
1037 if (in_progress)
1038 *in_progress = false;
1039
1040 return page;
1041 } catch (...) {
1042 auto in_progress = in_progress_w.lock();
1043
1044 if (in_progress)
1045 *in_progress = false;
1046
1047 throw;
1048 }
1049 });
1050}
1051
1052bool
1054{
1055 result_->check_closed();
1056 return !*last_;
1057}
1058
1059sql_result::page_iterator_sync::non_copyables::non_copyables(
1060 page_iterator&& iter)
1061 : preloaded_page_{}
1062 , iter_{ std::move(iter) }
1063{
1064}
1065
1066sql_result::page_iterator_sync::page_iterator_sync(
1067 page_iterator&& iter,
1068 std::chrono::milliseconds timeout)
1069 : block_(std::make_shared<non_copyables>(std::move(iter)))
1070 , current_{ block_->iter_.next().get() }
1071 , timeout_{ timeout }
1072{
1073 if (block_->iter_.has_next()) {
1074 block_->preloaded_page_ = block_->iter_.next();
1075 }
1076}
1077
1078void
1080{
1081 timeout_ = t;
1082}
1083
1084std::chrono::milliseconds
1086{
1087 return timeout_;
1088}
1089
1090bool
1091operator==(const sql_result::page_iterator_sync& x,
1093{
1094 return !x.block_ && !y.block_;
1095}
1096
1097bool
1098operator!=(const sql_result::page_iterator_sync& x,
1100{
1101 return !(x == y);
1102}
1103
1104sql_result::page_iterator_sync&
1106{
1107 if (!current_) {
1108 BOOST_THROW_EXCEPTION(exception::no_such_element(
1109 "sql_result::page_iterator_sync::operator++()",
1110 "Iterator already points to past-end element."));
1111 } else if (current_->last()) {
1112 block_.reset();
1113 current_.reset();
1114 } else {
1115 if (!block_->preloaded_page_.is_ready() && timeout_.count() > 0) {
1116 (void)block_->preloaded_page_.wait_for(
1117 boost::chrono::milliseconds{ timeout_.count() });
1118
1119 if (block_->preloaded_page_.is_ready()) {
1120 current_ = block_->preloaded_page_.get();
1121 } else {
1122 BOOST_THROW_EXCEPTION(exception::no_such_element());
1123 }
1124 } else {
1125 current_ = block_->preloaded_page_.get();
1126 }
1127
1128 if (block_->iter_.has_next()) {
1129 block_->preloaded_page_ = block_->iter_.next();
1130 }
1131 }
1132
1133 return *this;
1134}
1135
1136std::shared_ptr<sql_page>
1138{
1139 if (!current_) {
1140 BOOST_THROW_EXCEPTION(exception::no_such_element(
1141 "sql_result::page_iterator_sync::operator++()",
1142 "Iterator points to past-end element."));
1143 }
1144 return current_;
1145}
1146
1147std::shared_ptr<sql_page>
1152
1153std::size_t
1154sql_page::page_data::column_count() const
1155{
1156 return column_types_.size();
1157}
1158
1159std::size_t
1160sql_page::page_data::row_count() const
1161{
1162 return columns_[0].size();
1163}
1164
1165sql_page::sql_row::sql_row(size_t row_index, std::shared_ptr<page_data> shared)
1166 : row_index_(row_index)
1167 , page_data_(std::move(shared))
1168{
1169}
1170
1171std::size_t
1172sql_page::sql_row::resolve_index(const std::string& column_name) const
1173{
1174 auto it = page_data_->row_metadata_->find_column(column_name);
1175 if (it == page_data_->row_metadata_->end()) {
1176 throw exception::illegal_argument(
1177 "sql_page::get_object(const std::string &)",
1178 (boost::format("Column %1% doesn't exist") % column_name).str());
1179 }
1180 auto column_index = it->second;
1181 return column_index;
1182}
1183
1184const sql_row_metadata&
1186{
1187 return *page_data_->row_metadata_;
1188}
1189
1190void
1191sql_page::sql_row::check_index(size_t index) const
1192{
1193 if (index >= row_metadata().column_count()) {
1194 throw exception::index_out_of_bounds(
1195 "sql_page::sql_row::check_index",
1196 (boost::format("Column index is out of range: %1%") % index).str());
1197 }
1198}
1199
1200sql_page::sql_page(std::vector<sql_column_type> column_types,
1201 std::vector<column> columns,
1202 bool last,
1203 std::shared_ptr<sql_row_metadata> row_metadata)
1204 : page_data_{ new page_data{ std::move(column_types),
1205 std::move(columns),
1206 std::move(row_metadata),
1207 nullptr } }
1208 , last_(last)
1209{
1210}
1211
1212void
1213sql_page::construct_rows()
1214{
1215 auto count = row_count();
1216 rows_.clear();
1217 for (size_t i = 0; i < count; ++i) {
1218 rows_.emplace_back(i, page_data_);
1219 }
1220}
1221
1222const std::vector<sql_column_type>&
1224{
1225 return page_data_->column_types_;
1226}
1227
1228bool
1230{
1231 return last_;
1232}
1233
1234std::size_t
1236{
1237 return page_data_->column_count();
1238}
1239
1240std::size_t
1242{
1243 return page_data_->row_count();
1244}
1245
1246const std::vector<sql_page::sql_row>&
1248{
1249 return rows_;
1250}
1251
1252void
1253sql_page::row_metadata(std::shared_ptr<sql_row_metadata> row_meta)
1254{
1255 page_data_->row_metadata_ = std::move(row_meta);
1256}
1257
1258void
1259sql_page::serialization_service(serialization::pimpl::SerializationService* ss)
1260{
1261 page_data_->serialization_service_ = ss;
1262}
1263
1264sql_row_metadata::sql_row_metadata(std::vector<sql_column_metadata> columns)
1265 : columns_(std::move(columns))
1266{
1267 assert(!columns_.empty());
1268
1269 name_to_index_.reserve(columns_.size());
1270 for (std::size_t i = 0; i < columns_.size(); ++i) {
1271 name_to_index_.emplace(columns_[i].name, i);
1272 }
1273}
1274
1275std::size_t
1277{
1278 return columns_.size();
1279}
1280
1282sql_row_metadata::column(std::size_t index) const
1283{
1284 if (index >= columns_.size()) {
1285 throw exception::index_out_of_bounds(
1286 "sql_row_metadata::column(std::size_t index)",
1287 (boost::format("Column index is out of bounds: %1%") % index).str());
1288 }
1289
1290 return columns_[index];
1291}
1292
1293const std::vector<sql_column_metadata>&
1295{
1296 return columns_;
1297}
1298
1300sql_row_metadata::find_column(const std::string& column_name) const
1301{
1302 return name_to_index_.find(column_name);
1303}
1304
1307{
1308 return name_to_index_.end();
1309}
1310bool
1311operator==(const sql_row_metadata& lhs, const sql_row_metadata& rhs)
1312{
1313 return lhs.columns_ == rhs.columns_;
1314}
1315
1316bool
1317operator==(const sql_column_metadata& lhs, const sql_column_metadata& rhs)
1318{
1319 return lhs.name == rhs.name && lhs.type == rhs.type &&
1320 lhs.nullable == rhs.nullable;
1321}
1322
1323} // namespace sql
1324} // namespace client
1325} // namespace hazelcast
Base class for all exception originated from Hazelcast methods.
Definition iexception.h:49
int32_t code() const
Gets the internal error code associated with the exception.
Definition sql.cpp:618
const boost::uuids::uuid & originating_member_id() const
Gets ID of the member that caused or initiated an error condition.
Definition sql.cpp:612
const boost::optional< std::string > & suggestion() const
Gets the suggested SQL statement to remediate experienced error.
Definition sql.cpp:624
const sql_row_metadata & row_metadata() const
Gets the row metadata.
Definition sql.cpp:1185
std::size_t row_count() const
Returns the number of rows in this page.
Definition sql.cpp:1241
sql_page(std::vector< sql_column_type > column_types, std::vector< column > columns, bool last, std::shared_ptr< sql_row_metadata > row_metadata=nullptr)
Constructs an sql_page from the response returned from the server.
Definition sql.cpp:1200
std::size_t column_count() const
Returns the number of columns in each row.
Definition sql.cpp:1235
const std::vector< sql_row > & rows() const
Returns the rows of this page.
Definition sql.cpp:1247
const std::vector< sql_column_type > & column_types() const
Returns the types of the columns in each row.
Definition sql.cpp:1223
Copy is allowed for convenience but it does shallow copy so it should be avoided.
Definition sql_result.h:114
std::shared_ptr< sql_page > operator*() const
Dereferences current page.
Definition sql.cpp:1137
std::chrono::milliseconds timeout() const
Retrieves the timeout.
Definition sql.cpp:1085
page_iterator_sync & operator++()
Fetches next page with blocking manner.
Definition sql.cpp:1105
void set_timeout(std::chrono::milliseconds)
Sets timeout for page fetch operation.
Definition sql.cpp:1079
std::shared_ptr< sql_page > operator->() const
Dereferences current page.
Definition sql.cpp:1148
Copy is allowed for convenience but it does shallow copy so it should be avoided.
Definition sql_result.h:75
bool has_next() const
Tells whether there are pages to be retrieved.
Definition sql.cpp:1053
boost::future< std::shared_ptr< sql_page > > next()
Fetches the new page.
Definition sql.cpp:982
Copy is allowed for convenience but it does shallow copy so it should be avoided.
Definition sql_result.h:187
void set_timeout(std::chrono::milliseconds)
Sets timeout for page fetch operation.
Definition sql.cpp:845
row_iterator_sync & operator++()
Fetches next row in blocking manner.
Definition sql.cpp:873
const sql_page::sql_row & operator*() const
Returns current row.
Definition sql.cpp:857
const sql_page::sql_row * operator->() const
Returns current row.
Definition sql.cpp:867
std::chrono::milliseconds timeout() const
Retrieves the timeout.
Definition sql.cpp:851
bool row_set() const
Return whether this result has rows to iterate using the iterator() method.
Definition sql.cpp:777
const sql_row_metadata & row_metadata() const
Gets the row metadata.
Definition sql.cpp:945
page_iterator iterator()
Returns an iterator over the result pages.
Definition sql.cpp:783
virtual ~sql_result()
The destructor closes the result if it were open.
Definition sql.cpp:955
int64_t update_count() const
Returns the number of rows updated by the statement or -1 if this result is a row set.
Definition sql.cpp:771
boost::future< void > close()
Release the resources associated with the query result.
Definition sql.cpp:901
std::unordered_map< std::string, std::size_t >::const_iterator const_iterator
key is the column name, value is the column index.
const sql_column_metadata & column(std::size_t index) const
Gets column metadata.
Definition sql.cpp:1282
std::size_t column_count() const
Gets the number of columns in the row.
Definition sql.cpp:1276
const_iterator end() const
Constant indicating that the column is not found.
Definition sql.cpp:1306
const_iterator find_column(const std::string &column_name) const
Find index of the column with the given name.
Definition sql.cpp:1300
const std::vector< sql_column_metadata > & columns() const
Gets columns metadata.
Definition sql.cpp:1294
A service to execute SQL statements.
Definition sql_service.h:90
boost::future< std::shared_ptr< sql_result > > execute(const std::string &query, const Params &... params)
Convenient method to execute a distributed query with the given parameter values.
Definition of an SQL statement.
std::shared_ptr< std::atomic< int32_t > > partition_argument_index() const
Get the partition argument index value.
Definition sql.cpp:578
static constexpr int32_t DEFAULT_CURSOR_BUFFER_SIZE
Default cursor buffer size.
const boost::optional< std::string > & schema() const
Gets the schema name.
Definition sql.cpp:552
std::chrono::milliseconds timeout() const
Gets the execution timeout in milliseconds.
Definition sql.cpp:529
sql_statement & clear_parameters()
Clears statement parameter values.
Definition sql.cpp:504
sql_statement(hazelcast_client &client, std::string query)
Creates a statement with the given query.
Definition sql.cpp:461
int32_t cursor_buffer_size() const
Gets the cursor buffer size (measured in the number of rows).
Definition sql.cpp:512
const std::string & sql() const
Definition sql.cpp:488
static constexpr std::chrono::milliseconds TIMEOUT_NOT_SET
Value for the timeout that is not set.
sql_expected_result_type expected_result_type() const
Gets the expected result type.
Definition sql.cpp:565
STL namespace.