18 #include "hazelcast/client/item_listener.h"
19 #include "hazelcast/client/proxy/IQueueImpl.h"
20 #include "hazelcast/client/impl/ItemEventHandler.h"
21 #include "hazelcast/client/protocol/codec/codecs.h"
29 class iqueue :
public proxy::IQueueImpl {
30 friend class spi::ProxyManager;
32 static constexpr
const char *SERVICE_NAME =
"hz:impl:queueService";
49 std::unique_ptr<impl::item_event_handler<protocol::codec::queue_addlistener_handler>> itemEventHandler(
50 new impl::item_event_handler<protocol::codec::queue_addlistener_handler>(
51 get_name(), get_context().get_client_cluster_service(),
52 get_context().get_serialization_service(),
56 return proxy::IQueueImpl::add_item_listener(std::move(itemEventHandler), include_value);
67 boost::future<bool>
offer(
const E &element) {
68 return offer(element, std::chrono::milliseconds(0));
76 boost::future<void>
put(
const E &element) {
77 return proxy::IQueueImpl::put(to_data(element));
90 boost::future<bool>
offer(
const E &element, std::chrono::milliseconds timeout) {
91 return proxy::IQueueImpl::offer(to_data(element), timeout);
99 boost::future<boost::optional<E>>
take() {
100 return to_object<E>(proxy::IQueueImpl::take_data());
109 boost::future<boost::optional<E>>
poll(std::chrono::milliseconds timeout) {
110 return to_object<E>(proxy::IQueueImpl::poll_data(timeout));
119 boost::future<bool>
remove(
const E &element) {
120 return proxy::IQueueImpl::remove(to_data(element));
130 return proxy::IQueueImpl::contains(to_data(element));
140 boost::future<size_t>
drain_to(std::vector<E> &elements) {
141 return proxy::IQueueImpl::drain_to_data().then(boost::launch::sync,
142 [&](boost::future<std::vector<serialization::pimpl::data>> f) {
143 return drain_items(std::move(f), elements);
155 boost::future<size_t>
drain_to(std::vector<E> &elements,
size_t max_elements) {
156 return proxy::IQueueImpl::drain_to_data(max_elements).then(boost::launch::sync,
157 [&](boost::future<std::vector<serialization::pimpl::data>> f) {
158 return drain_items(std::move(f),
169 boost::future<boost::optional<E>>
poll() {
170 return poll<E>(std::chrono::milliseconds(0));
179 boost::future<boost::optional<E>>
peek() {
180 return to_object<E>(proxy::IQueueImpl::peek_data());
189 return to_object_vector<E>(proxy::IQueueImpl::to_array_data());
199 return proxy::IQueueImpl::contains_all_data(to_data_collection(elements));
208 boost::future<bool>
add_all(
const std::vector<E> &elements) {
209 return proxy::IQueueImpl::add_all_data(to_data_collection(elements));
218 boost::future<bool>
remove_all(
const std::vector<E> &elements) {
219 return proxy::IQueueImpl::remove_all_data(to_data_collection(elements));
229 boost::future<bool>
retain_all(
const std::vector<E> &elements) {
230 return proxy::IQueueImpl::retain_all_data(to_data_collection(elements));
234 iqueue(
const std::string &instance_name, spi::ClientContext *context) : proxy::IQueueImpl(instance_name,
238 size_t drain_items(boost::future<std::vector<serialization::pimpl::data>> f, std::vector<E> &elements) {
239 auto datas = f.get();
240 auto size = datas.size();
241 elements.reserve(size);
242 auto &ss = get_context().get_serialization_service();
243 for (
auto &data : datas) {
244 elements.push_back(ss.template to_object<E>(data).value());
Concurrent, blocking, distributed, observable, client queue.
boost::future< boost::optional< E > > poll(std::chrono::milliseconds timeout)
boost::future< bool > remove(const E &element)
boost::future< bool > contains(const E &element)
boost::future< boost::optional< E > > peek()
Returns immediately without waiting.
boost::future< bool > add_all(const std::vector< E > &elements)
boost::future< size_t > drain_to(std::vector< E > &elements, size_t max_elements)
Note that elements will be pushed_back to vector.
boost::future< bool > offer(const E &element, std::chrono::milliseconds timeout)
Inserts the specified element into this queue.
boost::future< boost::optional< E > > poll()
Returns immediately without waiting.
boost::future< bool > remove_all(const std::vector< E > &elements)
boost::future< bool > contains_all(const std::vector< E > &elements)
boost::future< std::vector< E > > to_array()
boost::future< void > put(const E &element)
Puts the element into queue.
boost::future< bool > offer(const E &element)
Inserts the specified element into this queue.
boost::future< boost::uuids::uuid > add_item_listener(item_listener &&listener, bool include_value)
Adds an item listener for this collection.
boost::future< bool > retain_all(const std::vector< E > &elements)
Removes the elements from this queue that are not available in given "elements" vector.
boost::future< size_t > drain_to(std::vector< E > &elements)
Note that elements will be pushed_back to vector.
boost::future< boost::optional< E > > take()
Item listener for IQueue, ISet and IList.