20 #include <condition_variable>
22 #include <boost/thread/future.hpp>
24 #include "hazelcast/util/Preconditions.h"
76 class pipelining :
public std::enable_shared_from_this<pipelining<E> > {
89 static std::shared_ptr<pipelining>
create(
int depth) {
90 util::Preconditions::check_positive(depth,
"depth must be positive");
92 return std::shared_ptr<pipelining>(
new pipelining(depth));
106 std::vector<boost::optional<E>> result;
107 result.reserve(futures_.size());
108 auto result_futures = when_all(futures_.begin(), futures_.end());
109 for (
auto &f : result_futures.get()) {
110 result.emplace_back(f.get());
124 void add(boost::future<boost::optional<E>> future) {
127 futures_.push_back(future.share());
131 pipelining(
int depth) : permits_(util::Preconditions::check_positive(depth,
"depth must be positive")) {
136 for (
auto &f : futures_) {
141 if (usedPermits >= permits_) {
142 decltype(futures_) outStandingFutures;
143 for (
auto &f : futures_) {
145 outStandingFutures.push_back(f);
149 if (!outStandingFutures.empty()) {
150 boost::when_any(outStandingFutures.begin(), outStandingFutures.end()).get();
156 std::vector<boost::shared_future<boost::optional<E>>> futures_;
static std::shared_ptr< pipelining > create(int depth)
Creates a Pipelining with the given depth.
std::vector< boost::optional< E > > results()
Returns the results.
void add(boost::future< boost::optional< E >> future)
Adds a future to this Pipelining or blocks until there is capacity to add the future to the Pipelinin...