Hazelcast C++ Client
Hazelcast C++ Client Library
All Classes Functions Variables Enumerations Enumerator Pages
pipelining.h
1 /*
2  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <vector>
19 #include <mutex>
20 #include <condition_variable>
21 
22 #include <boost/thread/future.hpp>
23 
24 #include "hazelcast/util/Preconditions.h"
25 
26 namespace hazelcast {
27  namespace client {
75  template<typename E>
76  class pipelining : public std::enable_shared_from_this<pipelining<E> > {
77  public:
89  static std::shared_ptr<pipelining> create(int depth) {
90  util::Preconditions::check_positive(depth, "depth must be positive");
91 
92  return std::shared_ptr<pipelining>(new pipelining(depth));
93  }
94 
105  std::vector<boost::optional<E>> results() {
106  std::vector<boost::optional<E>> result;
107  result.reserve(futures_.size());
108  auto result_futures = when_all(futures_.begin(), futures_.end());
109  for (auto &f : result_futures.get()) {
110  result.emplace_back(f.get());
111  }
112  return result;
113  }
114 
124  void add(boost::future<boost::optional<E>> future) {
125  down();
126 
127  futures_.push_back(future.share());
128  }
129 
130  private:
131  pipelining(int depth) : permits_(util::Preconditions::check_positive(depth, "depth must be positive")) {
132  }
133 
134  void down() {
135  int usedPermits = 0;
136  for (auto &f : futures_) {
137  if (!f.is_ready()) {
138  ++usedPermits;
139  }
140  }
141  if (usedPermits >= permits_) {
142  decltype(futures_) outStandingFutures;
143  for (auto &f : futures_) {
144  if (!f.is_ready()) {
145  outStandingFutures.push_back(f);
146  }
147  }
148 
149  if (!outStandingFutures.empty()) {
150  boost::when_any(outStandingFutures.begin(), outStandingFutures.end()).get();
151  }
152  }
153  }
154 
155  int permits_;
156  std::vector<boost::shared_future<boost::optional<E>>> futures_;
157  };
158  }
159 }
static std::shared_ptr< pipelining > create(int depth)
Creates a Pipelining with the given depth.
Definition: pipelining.h:89
std::vector< boost::optional< E > > results()
Returns the results.
Definition: pipelining.h:105
void add(boost::future< boost::optional< E >> future)
Adds a future to this Pipelining or blocks until there is capacity to add the future to the Pipelinin...
Definition: pipelining.h:124