Hazelcast C++ Client
Hazelcast C++ Client Library
sql.cpp
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <boost/uuid/random_generator.hpp>
18 
19 #include "hazelcast/client/connection/ClientConnectionManagerImpl.h"
20 #include "hazelcast/client/protocol/codec/codecs.h"
21 #include "hazelcast/client/protocol/codec/builtin/sql_page_codec.h"
22 #include "hazelcast/client/spi/ClientContext.h"
23 #include "hazelcast/client/spi/impl/ClientInvocation.h"
24 #include "hazelcast/client/sql/impl/query_id.h"
25 #include "hazelcast/client/sql/sql_column_metadata.h"
26 #include "hazelcast/client/sql/sql_column_type.h"
27 #include "hazelcast/client/sql/sql_result.h"
28 #include "hazelcast/client/sql/sql_statement.h"
29 #include "hazelcast/client/sql/hazelcast_sql_exception.h"
30 #include "hazelcast/client/sql/sql_service.h"
31 #include "hazelcast/client/hazelcast_client.h"
32 #include "hazelcast/client/sql/impl/query_utils.h"
33 #include "hazelcast/client/sql/impl/sql_error_code.h"
34 #include "hazelcast/client/sql/sql_row_metadata.h"
35 #include "hazelcast/client/sql/sql_page.h"
36 
37 namespace hazelcast {
38 namespace client {
39 namespace sql {
40 
41 sql_service::sql_service(client::spi::ClientContext& context)
42  : client_context_(context)
43 {
44 }
45 
46 sql::impl::query_id
47 sql_service::create_query_id(
48  const std::shared_ptr<connection::Connection>& query_conn)
49 {
50  auto local_id = client_context_.random_uuid();
51  auto member_id = query_conn->get_remote_uuid();
52 
53  return { member_id, local_id };
54 }
55 
56 boost::future<std::shared_ptr<sql_result>>
57 sql_service::execute(const sql_statement& statement)
58 {
59  using protocol::ClientMessage;
60 
61  auto query_conn = query_connection();
62  sql::impl::query_id qid = create_query_id(query_conn);
63 
64  auto request = protocol::codec::sql_execute_encode(
65  statement.sql(),
66  statement.serialized_parameters_,
67  static_cast<int64_t>(statement.timeout().count()),
68  static_cast<int32_t>(statement.cursor_buffer_size()),
69  statement.schema() ? &statement.schema().value() : nullptr,
70  static_cast<byte>(statement.expected_result_type()),
71  qid,
72  false);
73 
74  auto invocation = spi::impl::ClientInvocation::create(
75  client_context_, request, "", query_conn);
76 
77  auto cursor_buffer_size = statement.cursor_buffer_size();
78 
79  return invocation->invoke().then(
80  boost::launch::sync,
81  [this, query_conn, qid, cursor_buffer_size](
82  boost::future<ClientMessage> response_fut) {
83  try {
84  auto response = response_fut.get();
85  return handle_execute_response(
86  response, query_conn, qid, cursor_buffer_size);
87  } catch (const std::exception& e) {
88  rethrow(e, query_conn);
89  }
90  assert(0);
91  return std::shared_ptr<sql_result>();
92  });
93 }
94 
95 boost::future<void>
96 sql_service::close(const std::shared_ptr<connection::Connection>& connection,
97  impl::query_id id)
98 {
99  auto close_message = protocol::codec::sql_close_encode(id);
100 
101  auto invocation = spi::impl::ClientInvocation::create(
102  client_context_, close_message, "", connection);
103  return invocation->invoke().then(
104  [this, connection](boost::future<protocol::ClientMessage> f) {
105  try {
106  f.get();
107  } catch (const exception::iexception& e) {
108  rethrow(e, connection);
109  }
110 
111  return;
112  });
113 }
114 
115 sql_service::sql_execute_response_parameters
116 sql_service::decode_execute_response(protocol::ClientMessage& msg)
117 {
118  static constexpr size_t RESPONSE_UPDATE_COUNT_FIELD_OFFSET =
119  protocol::ClientMessage::RESPONSE_HEADER_LEN;
120  static constexpr size_t RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET =
121  RESPONSE_UPDATE_COUNT_FIELD_OFFSET + protocol::ClientMessage::INT64_SIZE;
122 
123  sql_execute_response_parameters response;
124 
125  auto initial_frame_header = msg.read_frame_header();
126  msg.rd_ptr(protocol::ClientMessage::RESPONSE_HEADER_LEN -
127  protocol::ClientMessage::SIZE_OF_FRAME_LENGTH_AND_FLAGS);
128 
129  response.update_count = msg.get<int64_t>();
130 
131  auto frame_len = static_cast<int32_t>(initial_frame_header.frame_len);
132  if (frame_len >=
133  static_cast<int32_t>(RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
134  protocol::ClientMessage::BOOL_SIZE)) {
135  response.is_infinite_rows = msg.get<bool>();
136  response.is_infinite_rows_exist = true;
137  // skip initial_frame
138  msg.rd_ptr(frame_len -
139  static_cast<int32_t>(RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET +
140  protocol::ClientMessage::BOOL_SIZE));
141  } else {
142  response.is_infinite_rows_exist = false;
143  // skip initial_frame
144  msg.rd_ptr(static_cast<std::size_t>(frame_len) -
145  RESPONSE_IS_INFINITE_ROWS_FIELD_OFFSET);
146  }
147 
148  auto column_metadata = msg.get_nullable<std::vector<sql_column_metadata>>();
149  if (column_metadata) {
150  response.row_metadata = std::make_shared<sql_row_metadata>(std::move(column_metadata.value()));
151  }
152  auto row_metadata = response.row_metadata;
153  auto page = msg.get_nullable<std::shared_ptr<sql::sql_page>>(
154  [row_metadata](protocol::ClientMessage& msg) {
155  return protocol::codec::builtin::sql_page_codec::decode(msg,
156  row_metadata);
157  });
158  if (page) {
159  response.first_page = *std::move(page);
160  }
161 
162  response.error = msg.get_nullable<impl::sql_error>();
163 
164  return response;
165 }
166 
167 sql_service::sql_fetch_response_parameters
168 sql_service::decode_fetch_response(protocol::ClientMessage message)
169 {
170  // empty initial frame
171  message.skip_frame();
172 
173  auto page =
174  message.get_nullable<std::shared_ptr<sql::sql_page>>([](protocol::ClientMessage& msg) {
175  return protocol::codec::builtin::sql_page_codec::decode(msg);
176  });
177  auto error = message.get<boost::optional<impl::sql_error>>();
178  return { std::move(page), std::move(error) };
179 }
180 
181 std::shared_ptr<sql_result>
182 sql_service::handle_execute_response(
183  protocol::ClientMessage& msg,
184  std::shared_ptr<connection::Connection> connection,
185  impl::query_id id,
186  int32_t cursor_buffer_size)
187 {
188  auto response = decode_execute_response(msg);
189  if (response.error) {
190  BOOST_THROW_EXCEPTION(sql::hazelcast_sql_exception(
191  "sql_service::handle_execute_response",
192  std::move(response.error->originating_member_id),
193  response.error->code,
194  std::move(response.error->message),
195  std::move(response.error->suggestion)));
196  }
197 
198  return std::shared_ptr<sql_result>(
199  new sql_result(&client_context_,
200  this,
201  std::move(connection),
202  id,
203  response.update_count,
204  std::move(response.row_metadata),
205  std::move(response.first_page),
206  cursor_buffer_size));
207 }
208 
209 std::shared_ptr<connection::Connection>
210 sql_service::query_connection()
211 {
212  std::shared_ptr<connection::Connection> connection;
213  try {
214  auto& cs = client_context_.get_client_cluster_service();
215  connection =
216  client_context_.get_connection_manager().connection_for_sql(
217  [&]() {
218  return impl::query_utils::member_of_same_larger_version_group(
219  cs.get_member_list());
220  },
221  [&](boost::uuids::uuid id) { return cs.get_member(id); });
222 
223  if (!connection) {
224  try {
225  throw exception::query(
226  static_cast<int32_t>(
227  impl::sql_error_code::CONNECTION_PROBLEM),
228  "Client is not connected");
229  } catch (const exception::query& e) {
230  rethrow(e);
231  }
232  }
233 
234  } catch (const std::exception& e) {
235  rethrow(e);
236  }
237 
238  return connection;
239 }
240 
241 void
242 sql_service::rethrow(const std::exception& e)
243 {
244  // Make sure that access_control is thrown as a top-level exception
245  try {
246  std::rethrow_if_nested(e);
247  } catch (exception::access_control&) {
248  throw;
249  } catch (...) {
250  impl::query_utils::throw_public_exception(std::current_exception(),
251  client_id());
252  }
253 
254  impl::query_utils::throw_public_exception(std::current_exception(),
255  client_id());
256 }
257 
258 void
259 sql_service::rethrow(const std::exception& cause,
260  const std::shared_ptr<connection::Connection>& connection)
261 {
262  if (!connection->is_alive()) {
263  auto msg = (boost::format("Cluster topology changed while a query was "
264  "executed: Member cannot be reached: %1%") %
265  connection->get_remote_address())
266  .str();
267  return impl::query_utils::throw_public_exception(
268  std::make_exception_ptr(exception::query(
269  static_cast<int32_t>(impl::sql_error_code::CONNECTION_PROBLEM),
270  msg,
271  std::make_exception_ptr(cause))),
272  client_id());
273  }
274 
275  return rethrow(cause);
276 }
277 
278 boost::uuids::uuid
279 sql_service::client_id()
280 {
281  return client_context_.get_connection_manager().get_client_uuid();
282 }
283 
284 boost::future<std::shared_ptr<sql_page>>
285 sql_service::fetch_page(
286  const impl::query_id& q_id,
287  int32_t cursor_buffer_size,
288  const std::shared_ptr<connection::Connection>& connection)
289 {
290  auto request_message =
291  protocol::codec::sql_fetch_encode(q_id, cursor_buffer_size);
292 
293  auto response_future = spi::impl::ClientInvocation::create(
294  client_context_, request_message, "", connection)
295  ->invoke();
296 
297  return response_future.then(
298  boost::launch::sync, [this](boost::future<protocol::ClientMessage> f) {
299  std::shared_ptr<sql_page> page;
300  try {
301  auto response_message = f.get();
302 
303  sql_fetch_response_parameters response_params =
304  this->decode_fetch_response(std::move(response_message));
305 
306  hazelcast::client::sql::sql_service::handle_fetch_response_error(
307  std::move(response_params.error));
308 
309  page = std::move(*response_params.page);
310  } catch (exception::iexception&) {
311  impl::query_utils::throw_public_exception(
312  std::current_exception(), this->client_id());
313  }
314 
315  return page;
316  });
317 }
318 
319 void
320 sql_service::handle_fetch_response_error(boost::optional<impl::sql_error> error)
321 {
322  if (error) {
323  throw hazelcast_sql_exception(
324  "sql_service::handle_fetch_response_error",
325  error->originating_member_id,
326  error->code,
327  error->message,
328  error->suggestion);
329  }
330 }
331 
332 constexpr std::chrono::milliseconds sql_statement::TIMEOUT_NOT_SET;
333 constexpr std::chrono::milliseconds sql_statement::TIMEOUT_DISABLED;
334 constexpr std::chrono::milliseconds sql_statement::DEFAULT_TIMEOUT;
335 constexpr int32_t sql_statement::DEFAULT_CURSOR_BUFFER_SIZE;
336 
337 sql_statement::sql_statement(hazelcast_client& client, std::string query)
338  : serialized_parameters_{}
339  , cursor_buffer_size_{ DEFAULT_CURSOR_BUFFER_SIZE }
340  , timeout_{ TIMEOUT_NOT_SET }
341  , expected_result_type_{ sql_expected_result_type::any }
342  , schema_{}
343  , serialization_service_(
344  spi::ClientContext(client).get_serialization_service())
345 {
346  sql(std::move(query));
347 }
348 
349 sql_statement::sql_statement(spi::ClientContext& client_context,
350  std::string query)
351  : serialized_parameters_{}
352  , cursor_buffer_size_{ DEFAULT_CURSOR_BUFFER_SIZE }
353  , timeout_{ TIMEOUT_NOT_SET }
354  , expected_result_type_{ sql_expected_result_type::any }
355  , schema_{}
356  , serialization_service_(client_context.get_serialization_service())
357 {
358  sql(std::move(query));
359 }
360 
361 const std::string&
363 {
364  return sql_;
365 }
366 
368 sql_statement::sql(std::string sql_string)
369 {
370  util::Preconditions::check_not_empty(sql_string, "SQL cannot be empty");
371 
372  sql_ = std::move(sql_string);
373 
374  return *this;
375 }
376 
379 {
380  serialized_parameters_.clear();
381 
382  return *this;
383 }
384 
385 int32_t
387 {
388  return cursor_buffer_size_;
389 }
390 
393 {
394  util::Preconditions::check_positive(
395  size,
396  (boost::format("Cursor buffer size must be positive: %s") % size).str());
397  cursor_buffer_size_ = size;
398 
399  return *this;
400 }
401 
402 std::chrono::milliseconds
404 {
405  return timeout_;
406 }
407 
409 sql_statement::timeout(std::chrono::milliseconds timeout)
410 {
411  auto timeout_msecs = timeout.count();
412  if (timeout_msecs < 0 && timeout != TIMEOUT_NOT_SET) {
413  throw exception::illegal_argument(
414  "sql_statement::timeout(std::chrono::milliseconds timeout)",
415  (boost::format("Timeout must be non-negative or -1: %1% msecs") %
416  timeout_msecs)
417  .str());
418  }
419 
420  timeout_ = timeout;
421 
422  return *this;
423 }
424 
425 const boost::optional<std::string>&
427 {
428  return schema_;
429 }
430 
432 sql_statement::schema(boost::optional<std::string> schema)
433 {
434  schema_ = std::move(schema);
435  return *this;
436 }
437 
438 sql::sql_expected_result_type
440 {
441  return expected_result_type_;
442 }
443 
445 sql_statement::expected_result_type(sql::sql_expected_result_type type)
446 {
447  expected_result_type_ = type;
448  return *this;
449 }
450 
451 hazelcast_sql_exception::hazelcast_sql_exception(
452  std::string source,
453  boost::uuids::uuid originating_member_id,
454  int32_t code,
455  boost::optional<std::string> message,
456  boost::optional<std::string> suggestion,
457  std::exception_ptr cause)
458  : hazelcast_(std::move(source),
459  message ? std::move(message).value() : "",
460  "",
461  std::move(cause))
462  , originating_member_id_(std::move(originating_member_id))
463  , code_(code)
464  , suggestion_(std::move(suggestion))
465 {
466 }
467 
468 const boost::uuids::uuid&
470 {
471  return originating_member_id_;
472 }
473 
474 int32_t
476 {
477  return code_;
478 }
479 
480 const boost::optional<std::string>&
482 {
483  return suggestion_;
484 }
485 
486 namespace impl {
487 
488 std::ostream& operator<<(std::ostream& os, const query_id& id)
489 {
490  os << "query_id{member_id: " << boost::uuids::to_string(id.member_id)
491  << " local_id: " << boost::uuids::to_string(id.local_id) << "}";
492  return os;
493 }
494 
495 void
496 query_utils::throw_public_exception(std::exception_ptr exc,
497  boost::uuids::uuid id)
498 {
499  try {
500  std::rethrow_exception(exc);
501  } catch (hazelcast_sql_exception& e) {
502  throw;
503  } catch (exception::query& e) {
504  auto originating_member_id = e.originating_member_uuid();
505  if (originating_member_id.is_nil()) {
506  originating_member_id = id;
507  }
508 
509  throw hazelcast_sql_exception("query_utils::throw_public_exception",
510  originating_member_id,
511  e.code(),
512  e.get_message(),
513  e.suggestion(),
514  exc);
515  } catch (exception::iexception& ie) {
516  throw hazelcast_sql_exception(
517  "query_utils::throw_public_exception",
518  id,
519  static_cast<int32_t>(sql_error_code::GENERIC),
520  ie.get_message(),
521  boost::none,
522  exc);
523  }
524 }
525 
526 boost::optional<member>
527 query_utils::member_of_same_larger_version_group(
528  const std::vector<member>& members)
529 {
530  // The members should have at most 2 different version (ignoring the patch
531  // version). Find a random member from the larger same-version group.
532 
533  // we don't use 2-element array to save on litter
534  boost::optional<member::version> version0;
535  boost::optional<member::version> version1;
536  size_t count0 = 0;
537  size_t count1 = 0;
538 
539  for (const auto& m : members) {
540  if (m.is_lite_member()) {
541  continue;
542  }
543  auto v = m.get_version();
544  if (!version0 || *version0 == v) {
545  version0 = v;
546  ++count0;
547  } else if (!version1 || *version1 == v) {
548  version1 = v;
549  ++count1;
550  } else {
551  throw exception::runtime(
552  "query_utils::member_of_same_larger_version_group",
553  (boost::format(
554  "More than 2 distinct member versions found: %1% , %2%") %
555  version0 % version1)
556  .str());
557  }
558  }
559 
560  assert(count1 == 0 || count0 > 0);
561 
562  // no data members
563  if (count0 == 0) {
564  return boost::none;
565  }
566 
567  size_t count;
568  member::version version;
569  if (count0 > count1 || (count0 == count1 && *version0 > *version1)) {
570  count = count0;
571  version = *version0;
572  } else {
573  count = count1;
574  version = *version1;
575  }
576 
577  // otherwise return a random member from the larger group
578  static thread_local std::mt19937 generator;
579  std::uniform_int_distribution<int> distribution(0, count - 1);
580  auto random_member_index = distribution(generator);
581  for (const auto& m : members) {
582  if (!m.is_lite_member() && m.get_version() == version) {
583  random_member_index--;
584  if (random_member_index < 0) {
585  return m;
586  }
587  }
588  }
589 
590  throw exception::runtime("query_utils::member_of_same_larger_version_group",
591  "should never get here");
592 }
593 
594 } // namespace impl
595 
596 sql_result::sql_result(
597  spi::ClientContext* client_context,
598  sql_service* service,
599  std::shared_ptr<connection::Connection> connection,
600  impl::query_id id,
601  int64_t update_count,
602  std::shared_ptr<sql_row_metadata> row_metadata,
603  std::shared_ptr<sql_page> first_page,
604  int32_t cursor_buffer_size)
605  : client_context_(client_context)
606  , service_(service)
607  , connection_(std::move(connection))
608  , query_id_(id)
609  , update_count_(update_count)
610  , row_metadata_(std::move(row_metadata))
611  , first_page_(std::move(first_page))
612  , iterator_requested_(false)
613  , closed_(false)
614  , cursor_buffer_size_(cursor_buffer_size)
615 {
616  if (row_metadata_) {
617  assert(first_page_);
618  first_page_->row_metadata(row_metadata_);
619  first_page_->serialization_service(
620  &client_context_->get_serialization_service());
621  update_count_ = -1;
622  } else {
623  closed_ = true;
624  }
625 }
626 
627 int64_t
629 {
630  return update_count_;
631 }
632 
633 bool
635 {
636  return update_count() == -1;
637 }
638 
641 {
642  check_closed();
643 
644  if (!first_page_) {
645  BOOST_THROW_EXCEPTION(exception::illegal_state(
646  "sql_result::iterator", "This result contains only update count"));
647  }
648 
649  if (iterator_requested_) {
650  BOOST_THROW_EXCEPTION(exception::illegal_state(
651  "sql_result::page_iterator", "Iterator can be requested only once"));
652  }
653 
654  iterator_requested_ = true;
655 
656  return { shared_from_this(), first_page_ };
657 }
658 
659 void
660 sql_result::check_closed() const
661 {
662  if (closed_) {
663  impl::query_utils::throw_public_exception(
664  std::make_exception_ptr(exception::query(
665  static_cast<int32_t>(impl::sql_error_code::CANCELLED_BY_USER),
666  "Query was cancelled by the user")),
667  service_->client_id());
668  }
669 }
670 
671 boost::future<void>
673 {
674  if (closed_) {
675  return boost::make_ready_future();
676  }
677 
678  auto release_resources = [this](){
679  {
680  std::lock_guard<std::mutex> guard{ mtx_ };
681  closed_ = true;
682 
683  connection_.reset();
684  }
685 
686  row_metadata_.reset();
687  first_page_.reset();
688  };
689 
690  try
691  {
692  auto f = service_->close(connection_, query_id_);
693 
694  release_resources();
695 
696  return f;
697  } catch (const std::exception& e) {
698  release_resources();
699 
700  service_->rethrow(e);
701  }
702 
703  // This should not be reached.
704  return boost::make_ready_future();
705 }
706 
707 boost::future<std::shared_ptr<sql_page>>
708 sql_result::fetch_page()
709 {
710  std::lock_guard<std::mutex> guard{ mtx_ };
711 
712  check_closed();
713  return service_->fetch_page(query_id_, cursor_buffer_size_, connection_);
714 }
715 
716 const sql_row_metadata&
718 {
719  if (!row_metadata_) {
720  throw exception::illegal_state(
721  "sql_result::row_metadata", "This result contains only update count");
722  }
723 
724  return *row_metadata_;
725 }
726 
728 {
729  try {
730  close().get();
731  } catch (...) {
732  // ignore
733  HZ_LOG(client_context_->get_logger(),
734  info,
735  (boost::format("[sql_result::~sql_result()] Exception while "
736  "closing the query result. Query id: %1%") %
737  query_id_)
738  .str());
739  }
740 }
741 
742 sql_result::page_iterator::page_iterator(std::shared_ptr<sql_result> result,
743  std::shared_ptr<sql_page> first_page)
744  : in_progress_{ std::make_shared<std::atomic<bool>>(false) }
745  , last_{ std::make_shared<std::atomic<bool>>(false) }
746  , row_metadata_{ result->row_metadata_ }
747  , serialization_(&result->client_context_->get_serialization_service())
748  , result_{ move(result) }
749  , first_page_(move(first_page))
750 {
751 }
752 
753 boost::future<std::shared_ptr<sql_page>>
755 {
756  result_->check_closed();
757 
758  if (first_page_) {
759  auto page = move(first_page_);
760 
761  page->serialization_service(serialization_);
762  page->row_metadata(row_metadata_);
763  *last_ = page->last();
764 
765  return boost::make_ready_future<std::shared_ptr<sql_page>>(page);
766  }
767 
768  if (*in_progress_) {
769  BOOST_THROW_EXCEPTION(
770  exception::illegal_access("sql_result::page_iterator::next",
771  "Fetch page operation is already in "
772  "progress so next must not be called."));
773  }
774 
775  if (*last_) {
776  BOOST_THROW_EXCEPTION(exception::no_such_element(
777  "sql_result::page_iterator::next",
778  "Last page is already retrieved so there are no more pages."));
779  }
780 
781  *in_progress_ = true;
782 
783  auto page_future = result_->fetch_page();
784 
785  std::weak_ptr<std::atomic<bool>> last_w{ last_ };
786  std::weak_ptr<std::atomic<bool>> in_progress_w{ in_progress_ };
787  std::shared_ptr<sql_row_metadata> row_metadata{ row_metadata_ };
788  auto result = result_;
789  auto serialization_service = serialization_;
790 
791  return page_future.then(
792  boost::launch::sync,
793  [serialization_service, row_metadata, last_w, in_progress_w, result](
794  boost::future<std::shared_ptr<sql_page>> page_f) {
795  try {
796  auto page = page_f.get();
797 
798  result->check_closed();
799  page->serialization_service(serialization_service);
800  page->row_metadata(move(row_metadata));
801 
802  auto last = last_w.lock();
803 
804  if (last)
805  *last = page->last();
806 
807  auto in_progress = in_progress_w.lock();
808 
809  if (in_progress)
810  *in_progress = false;
811 
812  return page;
813  } catch (...) {
814  auto in_progress = in_progress_w.lock();
815 
816  if (in_progress)
817  *in_progress = false;
818 
819  throw;
820  }
821  });
822 }
823 
824 bool
826 {
827  result_->check_closed();
828  return !*last_;
829 }
830 
831 std::size_t
832 sql_page::page_data::column_count() const
833 {
834  return column_types_.size();
835 }
836 
837 std::size_t
838 sql_page::page_data::row_count() const
839 {
840  return columns_[0].size();
841 }
842 
843 sql_page::sql_row::sql_row(size_t row_index,
844  std::shared_ptr<page_data> shared)
845  : row_index_(row_index)
846  , page_data_(std::move(shared))
847 {
848 }
849 
850 std::size_t
851 sql_page::sql_row::resolve_index(const std::string& column_name) const
852 {
853  auto it = page_data_->row_metadata_->find_column(column_name);
854  if (it == page_data_->row_metadata_->end()) {
855  throw exception::illegal_argument(
856  "sql_page::get_object(const std::string &)",
857  (boost::format("Column %1% doesn't exist") % column_name).str());
858  }
859  auto column_index = it->second;
860  return column_index;
861 }
862 
863 const sql_row_metadata&
865 {
866  return *page_data_->row_metadata_;
867 }
868 
869 void
870 sql_page::sql_row::check_index(size_t index) const
871 {
872  if (index >= row_metadata().column_count()) {
873  throw exception::index_out_of_bounds(
874  "sql_page::sql_row::check_index",
875  (boost::format("Column index is out of range: %1%") % index).str());
876  }
877 }
878 
879 sql_page::sql_page(std::vector<sql_column_type> column_types,
880  std::vector<column> columns,
881  bool last,
882  std::shared_ptr<sql_row_metadata> row_metadata)
883  : page_data_{ new page_data{ std::move(column_types),
884  std::move(columns),
885  std::move(row_metadata),
886  nullptr } }
887  , last_(last)
888 {
889 }
890 
891 void
892 sql_page::construct_rows()
893 {
894  auto count = row_count();
895  rows_.clear();
896  for (size_t i = 0; i < count; ++i) {
897  rows_.emplace_back(i, page_data_);
898  }
899 }
900 
901 const std::vector<sql_column_type>&
903 {
904  return page_data_->column_types_;
905 }
906 
907 bool
909 {
910  return last_;
911 }
912 
913 std::size_t
915 {
916  return page_data_->column_count();
917 }
918 
919 std::size_t
921 {
922  return page_data_->row_count();
923 }
924 
925 const std::vector<sql_page::sql_row>&
927 {
928  return rows_;
929 }
930 
931 void
932 sql_page::row_metadata(std::shared_ptr<sql_row_metadata> row_meta)
933 {
934  page_data_->row_metadata_ = std::move(row_meta);
935 }
936 
937 void
938 sql_page::serialization_service(serialization::pimpl::SerializationService* ss)
939 {
940  page_data_->serialization_service_ = ss;
941 }
942 
943 sql_row_metadata::sql_row_metadata(std::vector<sql_column_metadata> columns)
944  : columns_(std::move(columns))
945 {
946  assert(!columns_.empty());
947 
948  name_to_index_.reserve(columns_.size());
949  for (std::size_t i = 0; i < columns_.size(); ++i) {
950  name_to_index_.emplace(columns_[i].name, i);
951  }
952 }
953 
954 std::size_t
956 {
957  return columns_.size();
958 }
959 
960 const sql_column_metadata&
961 sql_row_metadata::column(std::size_t index) const
962 {
963  if (index >= columns_.size()) {
964  throw exception::index_out_of_bounds(
965  "sql_row_metadata::column(std::size_t index)",
966  (boost::format("Column index is out of bounds: %1%") % index).str());
967  }
968 
969  return columns_[index];
970 }
971 
972 const std::vector<sql_column_metadata>&
974 {
975  return columns_;
976 }
977 
979 sql_row_metadata::find_column(const std::string& column_name) const
980 {
981  return name_to_index_.find(column_name);
982 }
983 
986 {
987  return name_to_index_.end();
988 }
989 bool
990 operator==(const sql_row_metadata& lhs, const sql_row_metadata& rhs)
991 {
992  return lhs.columns_ == rhs.columns_;
993 }
994 
995 bool
996 operator==(const sql_column_metadata& lhs, const sql_column_metadata& rhs)
997 {
998  return lhs.name == rhs.name && lhs.type == rhs.type &&
999  lhs.nullable == rhs.nullable;
1000 }
1001 
1002 } // namespace sql
1003 } // namespace client
1004 } // namespace hazelcast
Base class for all exception originated from Hazelcast methods.
Definition: iexception.h:49
int32_t code() const
Gets the internal error code associated with the exception.
Definition: sql.cpp:475
const boost::uuids::uuid & originating_member_id() const
Gets ID of the member that caused or initiated an error condition.
Definition: sql.cpp:469
const boost::optional< std::string > & suggestion() const
Gets the suggested SQL statement to remediate experienced error.
Definition: sql.cpp:481
const sql_row_metadata & row_metadata() const
Gets the row metadata.
Definition: sql.cpp:864
std::size_t row_count() const
Returns the number of rows in this page.
Definition: sql.cpp:920
sql_page(std::vector< sql_column_type > column_types, std::vector< column > columns, bool last, std::shared_ptr< sql_row_metadata > row_metadata=nullptr)
Constructs an sql_page from the response returned from the server.
Definition: sql.cpp:879
std::size_t column_count() const
Returns the number of columns in each row.
Definition: sql.cpp:914
const std::vector< sql_row > & rows() const
Returns the rows of this page.
Definition: sql.cpp:926
const std::vector< sql_column_type > & column_types() const
Returns the types of the columns in each row.
Definition: sql.cpp:902
bool has_next() const
Tells whether there are pages to be retrieved.
Definition: sql.cpp:825
boost::future< std::shared_ptr< sql_page > > next()
Fetches the new page.
Definition: sql.cpp:754
bool row_set() const
Return whether this result has rows to iterate using the iterator() method.
Definition: sql.cpp:634
const sql_row_metadata & row_metadata() const
Gets the row metadata.
Definition: sql.cpp:717
page_iterator iterator()
Returns an iterator over the result pages.
Definition: sql.cpp:640
virtual ~sql_result()
The destructor closes the result if it were open.
Definition: sql.cpp:727
int64_t update_count() const
Returns the number of rows updated by the statement or -1 if this result is a row set.
Definition: sql.cpp:628
boost::future< void > close()
Release the resources associated with the query result.
Definition: sql.cpp:672
std::unordered_map< std::string, std::size_t >::const_iterator const_iterator
key is the column name, value is the column index.
const sql_column_metadata & column(std::size_t index) const
Gets column metadata.
Definition: sql.cpp:961
std::size_t column_count() const
Gets the number of columns in the row.
Definition: sql.cpp:955
const_iterator end() const
Constant indicating that the column is not found.
Definition: sql.cpp:985
const_iterator find_column(const std::string &column_name) const
Find index of the column with the given name.
Definition: sql.cpp:979
const std::vector< sql_column_metadata > & columns() const
Gets columns metadata.
Definition: sql.cpp:973
Definition of an SQL statement.
Definition: sql_statement.h:42
const boost::optional< std::string > & schema() const
Gets the schema name.
Definition: sql.cpp:426
std::chrono::milliseconds timeout() const
Gets the execution timeout in milliseconds.
Definition: sql.cpp:403
sql_statement & clear_parameters()
Clears statement parameter values.
Definition: sql.cpp:378
sql_statement(hazelcast_client &client, std::string query)
Creates a statement with the given query.
Definition: sql.cpp:337
int32_t cursor_buffer_size() const
Gets the cursor buffer size (measured in the number of rows).
Definition: sql.cpp:386
const std::string & sql() const
Definition: sql.cpp:362
static constexpr std::chrono::milliseconds TIMEOUT_NOT_SET
Value for the timeout that is not set.
Definition: sql_statement.h:48
sql_expected_result_type expected_result_type() const
Gets the expected result type.
Definition: sql.cpp:439