Hazelcast C++ Client
Hazelcast C++ Client Library
pipelining.h
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <vector>
19 #include <mutex>
20 #include <condition_variable>
21 
22 #include <boost/thread/future.hpp>
23 
24 #include "hazelcast/util/Preconditions.h"
25 
26 namespace hazelcast {
27 namespace client {
80 template<typename E>
81 class pipelining : public std::enable_shared_from_this<pipelining<E>>
82 {
83 public:
99  static std::shared_ptr<pipelining> create(int depth)
100  {
101  util::Preconditions::check_positive(depth, "depth must be positive");
102 
103  return std::shared_ptr<pipelining>(new pipelining(depth));
104  }
105 
116  std::vector<boost::optional<E>> results()
117  {
118  std::vector<boost::optional<E>> result;
119  result.reserve(futures_.size());
120  auto result_futures = when_all(futures_.begin(), futures_.end());
121  for (auto& f : result_futures.get()) {
122  result.emplace_back(f.get());
123  }
124  return result;
125  }
126 
136  void add(boost::future<boost::optional<E>> future)
137  {
138  down();
139 
140  futures_.push_back(future.share());
141  }
142 
143 private:
144  pipelining(int depth)
145  : permits_(
146  util::Preconditions::check_positive(depth, "depth must be positive"))
147  {}
148 
149  void down()
150  {
151  int usedPermits = 0;
152  for (auto& f : futures_) {
153  if (!f.is_ready()) {
154  ++usedPermits;
155  }
156  }
157  if (usedPermits >= permits_) {
158  decltype(futures_) outStandingFutures;
159  for (auto& f : futures_) {
160  if (!f.is_ready()) {
161  outStandingFutures.push_back(f);
162  }
163  }
164 
165  if (!outStandingFutures.empty()) {
166  boost::when_any(outStandingFutures.begin(),
167  outStandingFutures.end())
168  .get();
169  }
170  }
171  }
172 
173  int permits_;
174  std::vector<boost::shared_future<boost::optional<E>>> futures_;
175 };
176 } // namespace client
177 } // namespace hazelcast
static std::shared_ptr< pipelining > create(int depth)
Creates a Pipelining with the given depth.
Definition: pipelining.h:99
std::vector< boost::optional< E > > results()
Returns the results.
Definition: pipelining.h:116
void add(boost::future< boost::optional< E >> future)
Adds a future to this Pipelining or blocks until there is capacity to add the future to the Pipelinin...
Definition: pipelining.h:136