21 #include <boost/endian/conversion.hpp>
23 #include <hazelcast/client/impl/metrics/metric_descriptor.h>
24 #include <hazelcast/client/impl/metrics/metrics_compressor.h>
25 #include <hazelcast/client/impl/metrics/metrics_dictionary.h>
26 #include <hazelcast/util/byte.h>
35 constexpr
int MAX_WORD_LENGTH = 255;
36 constexpr
int NULL_DICTIONARY_ID = -1;
38 constexpr
byte MASK_PREFIX = 1 << 0;
39 constexpr
byte MASK_METRIC = 1 << 1;
40 constexpr
byte MASK_DISCRIMINATOR = 1 << 2;
41 constexpr
byte MASK_DISCRIMINATOR_VALUE = 1 << 3;
42 constexpr
byte MASK_UNIT = 1 << 4;
43 constexpr
byte MASK_EXCLUDED_TARGETS = 1 << 5;
44 constexpr
byte MASK_TAG_COUNT = 1 << 6;
46 constexpr
int VALUE_TYPE_LONG = 0;
48 constexpr
byte BINARY_FORMAT_VERSION = 1;
51 find_common_prefix_length(
const std::string& s1,
const std::string& s2)
54 while (len < s1.size() && len < s2.size() && s1[len] == s2[len]) {
69 zlib_compress(
const std::vector<byte>& input)
71 constexpr std::size_t max_block_size = (1 << 16) - 1;
73 const size_t num_blocks =
74 (std::max)(
static_cast<std::size_t
>(1),
75 (input.size() + max_block_size - 1) / max_block_size);
77 std::vector<byte> output;
80 output.reserve(input.size()
87 output.push_back(
static_cast<byte>(120));
88 output.push_back(
static_cast<byte>(1));
90 constexpr
long adler32_mod = 65521;
93 for (std::size_t block_start = 0U;
94 block_start == 0U || block_start < input.size();
95 block_start += max_block_size) {
96 const auto block_end =
97 (std::min)(block_start + max_block_size, input.size());
98 const auto block_size = block_end - block_start;
100 const bool is_final = block_end == input.size();
104 static_cast<byte>(is_final));
105 output.push_back(block_size & 0xff);
106 output.push_back(block_size >> 8);
107 output.push_back((~block_size) & 0xff);
108 output.push_back((~block_size) >> 8);
111 for (std::size_t i = block_start; i < block_end; i++) {
112 const auto x = input[i];
117 if (a1 >= adler32_mod) {
122 if (a2 >= adler32_mod) {
129 output.push_back(a2 >> 8);
130 output.push_back(a2 & 0xff);
131 output.push_back(a1 >> 8);
132 output.push_back(a1 & 0xff);
139 metric_descriptor::metric_descriptor(std::string prefix,
141 std::string discriminator,
142 std::string discriminator_value,
144 : prefix_{ std::move(prefix) }
145 , metric_{ std::move(metric) }
146 , discriminator_{ std::move(discriminator) }
147 , discriminator_value_{ std::move(discriminator_value) }
151 metric_descriptor::metric_descriptor(std::string prefix,
154 : prefix_{ std::move(prefix) }
155 , metric_{ std::move(metric) }
157 , discriminator_value_{}
162 metric_descriptor::prefix()
const
168 metric_descriptor::metric()
const
173 const boost::optional<std::string>&
174 metric_descriptor::discriminator()
const
176 return discriminator_;
179 const boost::optional<std::string>&
180 metric_descriptor::discriminator_value()
const
182 return discriminator_value_;
186 metric_descriptor::unit()
const
192 metrics_dictionary::get_dictionary_id(
const std::string& word)
194 if (word.size() > MAX_WORD_LENGTH) {
195 throw std::invalid_argument(
"too long value in metric descriptor");
198 auto word_position = word_to_id.find(word);
200 if (word_position == word_to_id.end()) {
201 const int next_id =
static_cast<int>(word_to_id.size());
202 word_to_id[word] = next_id;
206 return word_position->second;
210 metrics_dictionary::const_iterator
211 metrics_dictionary::begin() const noexcept
213 return word_to_id.cbegin();
216 metrics_dictionary::const_iterator
217 metrics_dictionary::end() const noexcept
219 return word_to_id.cend();
223 metrics_dictionary::size() const noexcept
225 return word_to_id.size();
229 output_buffer::write(
byte val)
231 buffer_.push_back(val);
235 output_buffer::write(int32_t val)
237 auto pos = buffer_.size();
238 buffer_.resize(pos +
sizeof(int32_t));
240 boost::endian::endian_store<int32_t,
242 boost::endian::order::big>(buffer_.data() + pos,
247 output_buffer::write(int64_t val)
249 auto pos = buffer_.size();
250 buffer_.resize(pos +
sizeof(int64_t));
252 boost::endian::endian_store<int64_t,
254 boost::endian::order::big>(buffer_.data() + pos,
259 output_buffer::write(
const std::string& str)
262 buffer_.push_back(
static_cast<byte>(0));
263 buffer_.push_back(c);
268 output_buffer::write(
const std::vector<byte>& vec)
270 buffer_.insert(buffer_.end(), vec.begin(), vec.end());
273 const std::vector<byte>&
274 output_buffer::content()
const
280 output_buffer::content()
286 metrics_compressor::add_long(
const metric_descriptor& descriptor, int64_t value)
288 write_descriptor(descriptor);
289 metrics_buffer_.write(
static_cast<byte>(VALUE_TYPE_LONG));
290 metrics_buffer_.write(value);
294 metrics_compressor::get_blob()
298 std::vector<byte> compressed_dictionary =
299 zlib_compress(dictionary_buffer_.content());
300 std::vector<byte> compressed_metrics =
301 zlib_compress(metrics_buffer_.content());
305 blob.write(
static_cast<byte>(0));
306 blob.write(BINARY_FORMAT_VERSION);
307 blob.write(
static_cast<int32_t
>(compressed_dictionary.size()));
308 blob.write(compressed_dictionary);
309 blob.write(
static_cast<int32_t
>(metrics_count));
310 blob.write(compressed_metrics);
312 return std::move(blob.content());
316 metrics_compressor::calculate_descriptor_mask(
317 const metric_descriptor& descriptor)
321 if (last_descriptor_) {
322 if (descriptor.prefix() == last_descriptor_->prefix()) {
326 if (descriptor.metric() == last_descriptor_->metric()) {
330 if (descriptor.discriminator() == last_descriptor_->discriminator()) {
331 mask |= MASK_DISCRIMINATOR;
334 if (descriptor.discriminator_value() ==
335 last_descriptor_->discriminator_value()) {
336 mask |= MASK_DISCRIMINATOR_VALUE;
339 if (descriptor.unit() == last_descriptor_->unit()) {
344 mask |= MASK_EXCLUDED_TARGETS;
345 mask |= MASK_TAG_COUNT;
352 metrics_compressor::get_dictionary_id(
const boost::optional<std::string>& word)
355 return NULL_DICTIONARY_ID;
358 return static_cast<int32_t
>(dictionary_.get_dictionary_id(word.get()));
362 metrics_compressor::write_descriptor(
const metric_descriptor& descriptor)
364 byte mask = calculate_descriptor_mask(descriptor);
366 metrics_buffer_.write(mask);
368 if ((mask & MASK_PREFIX) == 0) {
369 metrics_buffer_.write(get_dictionary_id(descriptor.prefix()));
372 if ((mask & MASK_METRIC) == 0) {
373 metrics_buffer_.write(get_dictionary_id(descriptor.metric()));
376 if ((mask & MASK_DISCRIMINATOR) == 0) {
377 metrics_buffer_.write(get_dictionary_id(descriptor.discriminator()));
380 if ((mask & MASK_DISCRIMINATOR_VALUE) == 0) {
381 metrics_buffer_.write(
static_cast<int32_t
>(
382 get_dictionary_id(descriptor.discriminator_value())));
385 if ((mask & MASK_UNIT) == 0) {
386 metrics_buffer_.write(
static_cast<byte>(descriptor.unit()));
389 if ((mask & MASK_EXCLUDED_TARGETS) == 0) {
390 metrics_buffer_.write(
static_cast<byte>(0));
393 if ((mask & MASK_TAG_COUNT) == 0) {
394 metrics_buffer_.write(
static_cast<byte>(0));
397 last_descriptor_ = descriptor;
402 metrics_compressor::write_dictionary()
404 std::string last_word{
"" };
406 dictionary_buffer_.write(
static_cast<int32_t
>(dictionary_.size()));
408 for (
const auto& item : dictionary_) {
409 const auto& word = item.first;
410 const auto&
id = item.second;
412 auto common_len = find_common_prefix_length(last_word, word);
414 dictionary_buffer_.write(
static_cast<int32_t
>(
id));
415 dictionary_buffer_.write(
static_cast<byte>(common_len));
416 dictionary_buffer_.write(
static_cast<byte>(word.size() - common_len));
417 dictionary_buffer_.write(word.substr(common_len));