Hazelcast C++ Client
Hazelcast C++ Client Library
metrics.cpp
1 /*
2  * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <string>
17 #include <vector>
18 #include <iostream>
19 #include <algorithm>
20 
21 #include <boost/endian/conversion.hpp>
22 
23 #include <hazelcast/client/impl/metrics/metric_descriptor.h>
24 #include <hazelcast/client/impl/metrics/metrics_compressor.h>
25 #include <hazelcast/client/impl/metrics/metrics_dictionary.h>
26 #include <hazelcast/util/byte.h>
27 
28 namespace hazelcast {
29 namespace client {
30 namespace impl {
31 namespace metrics {
32 
33 namespace {
34 
35 constexpr int MAX_WORD_LENGTH = 255;
36 constexpr int NULL_DICTIONARY_ID = -1;
37 
38 constexpr byte MASK_PREFIX = 1 << 0;
39 constexpr byte MASK_METRIC = 1 << 1;
40 constexpr byte MASK_DISCRIMINATOR = 1 << 2;
41 constexpr byte MASK_DISCRIMINATOR_VALUE = 1 << 3;
42 constexpr byte MASK_UNIT = 1 << 4;
43 constexpr byte MASK_EXCLUDED_TARGETS = 1 << 5;
44 constexpr byte MASK_TAG_COUNT = 1 << 6;
45 
46 constexpr int VALUE_TYPE_LONG = 0;
47 
48 constexpr byte BINARY_FORMAT_VERSION = 1;
49 
50 std::size_t
51 find_common_prefix_length(const std::string& s1, const std::string& s2)
52 {
53  std::size_t len = 0;
54  while (len < s1.size() && len < s2.size() && s1[len] == s2[len]) {
55  len++;
56  }
57  return len;
58 }
59 
68 std::vector<byte>
69 zlib_compress(const std::vector<byte>& input)
70 {
71  constexpr std::size_t max_block_size = (1 << 16) - 1;
72 
73  const size_t num_blocks =
74  (std::max)(static_cast<std::size_t>(1),
75  (input.size() + max_block_size - 1) / max_block_size);
76 
77  std::vector<byte> output;
78 
79  // reserve enough space beforehand
80  output.reserve(input.size() // input size itself
81  + 2 // zlib header
82  + 4 // Adler32 checksum
83  + 5 * num_blocks // block headers
84  );
85 
86  // ZLIB header
87  output.push_back(static_cast<byte>(120)); // CMF
88  output.push_back(static_cast<byte>(1)); // FLG
89 
90  constexpr long adler32_mod = 65521;
91  long a1 = 1, a2 = 0; // accumulators for Adler32 checksum
92 
93  for (std::size_t block_start = 0U;
94  block_start == 0U || block_start < input.size();
95  block_start += max_block_size) {
96  const auto block_end =
97  (std::min)(block_start + max_block_size, input.size());
98  const auto block_size = block_end - block_start;
99 
100  const bool is_final = block_end == input.size();
101 
102  // block header
103  output.push_back(
104  static_cast<byte>(is_final)); // BFINAL = is_final, BTYPE = 00
105  output.push_back(block_size & 0xff); // LEN - least significant
106  output.push_back(block_size >> 8); // LEN - most significant
107  output.push_back((~block_size) & 0xff); // NLEN - least significant
108  output.push_back((~block_size) >> 8); // NLEN - most significant
109 
110  // copy uncompressed bytes and accumulate checksum
111  for (std::size_t i = block_start; i < block_end; i++) {
112  const auto x = input[i];
113 
114  output.push_back(x);
115 
116  a1 += x;
117  if (a1 >= adler32_mod) {
118  a1 -= adler32_mod;
119  }
120 
121  a2 += a1;
122  if (a2 >= adler32_mod) {
123  a2 -= adler32_mod;
124  }
125  }
126  }
127 
128  // Adler32 checksum
129  output.push_back(a2 >> 8);
130  output.push_back(a2 & 0xff);
131  output.push_back(a1 >> 8);
132  output.push_back(a1 & 0xff);
133 
134  return output;
135 }
136 
137 } // namespace
138 
139 metric_descriptor::metric_descriptor(std::string prefix,
140  std::string metric,
141  std::string discriminator,
142  std::string discriminator_value,
143  probe_unit unit)
144  : prefix_{ std::move(prefix) }
145  , metric_{ std::move(metric) }
146  , discriminator_{ std::move(discriminator) }
147  , discriminator_value_{ std::move(discriminator_value) }
148  , unit_{ unit }
149 {}
150 
151 metric_descriptor::metric_descriptor(std::string prefix,
152  std::string metric,
153  probe_unit unit)
154  : prefix_{ std::move(prefix) }
155  , metric_{ std::move(metric) }
156  , discriminator_{}
157  , discriminator_value_{}
158  , unit_{ unit }
159 {}
160 
161 const std::string&
162 metric_descriptor::prefix() const
163 {
164  return prefix_;
165 }
166 
167 const std::string&
168 metric_descriptor::metric() const
169 {
170  return metric_;
171 }
172 
173 const boost::optional<std::string>&
174 metric_descriptor::discriminator() const
175 {
176  return discriminator_;
177 }
178 
179 const boost::optional<std::string>&
180 metric_descriptor::discriminator_value() const
181 {
182  return discriminator_value_;
183 }
184 
185 probe_unit
186 metric_descriptor::unit() const
187 {
188  return unit_;
189 }
190 
191 int
192 metrics_dictionary::get_dictionary_id(const std::string& word)
193 {
194  if (word.size() > MAX_WORD_LENGTH) {
195  throw std::invalid_argument("too long value in metric descriptor");
196  }
197 
198  auto word_position = word_to_id.find(word);
199 
200  if (word_position == word_to_id.end()) {
201  const int next_id = static_cast<int>(word_to_id.size());
202  word_to_id[word] = next_id;
203 
204  return next_id;
205  } else {
206  return word_position->second;
207  }
208 }
209 
210 metrics_dictionary::const_iterator
211 metrics_dictionary::begin() const noexcept
212 {
213  return word_to_id.cbegin();
214 }
215 
216 metrics_dictionary::const_iterator
217 metrics_dictionary::end() const noexcept
218 {
219  return word_to_id.cend();
220 }
221 
222 std::size_t
223 metrics_dictionary::size() const noexcept
224 {
225  return word_to_id.size();
226 }
227 
228 void
229 output_buffer::write(byte val)
230 {
231  buffer_.push_back(val);
232 }
233 
234 void
235 output_buffer::write(int32_t val)
236 {
237  auto pos = buffer_.size();
238  buffer_.resize(pos + sizeof(int32_t));
239 
240  boost::endian::endian_store<int32_t,
241  sizeof(int32_t),
242  boost::endian::order::big>(buffer_.data() + pos,
243  val);
244 }
245 
246 void
247 output_buffer::write(int64_t val)
248 {
249  auto pos = buffer_.size();
250  buffer_.resize(pos + sizeof(int64_t));
251 
252  boost::endian::endian_store<int64_t,
253  sizeof(int64_t),
254  boost::endian::order::big>(buffer_.data() + pos,
255  val);
256 }
257 
258 void
259 output_buffer::write(const std::string& str)
260 {
261  for (char c : str) {
262  buffer_.push_back(static_cast<byte>(0));
263  buffer_.push_back(c);
264  }
265 }
266 
267 void
268 output_buffer::write(const std::vector<byte>& vec)
269 {
270  buffer_.insert(buffer_.end(), vec.begin(), vec.end());
271 }
272 
273 const std::vector<byte>&
274 output_buffer::content() const
275 {
276  return buffer_;
277 }
278 
279 std::vector<byte>&
280 output_buffer::content()
281 {
282  return buffer_;
283 }
284 
285 void
286 metrics_compressor::add_long(const metric_descriptor& descriptor, int64_t value)
287 {
288  write_descriptor(descriptor);
289  metrics_buffer_.write(static_cast<byte>(VALUE_TYPE_LONG));
290  metrics_buffer_.write(value);
291 }
292 
293 std::vector<byte>
294 metrics_compressor::get_blob()
295 {
296  write_dictionary();
297 
298  std::vector<byte> compressed_dictionary =
299  zlib_compress(dictionary_buffer_.content());
300  std::vector<byte> compressed_metrics =
301  zlib_compress(metrics_buffer_.content());
302 
303  output_buffer blob;
304 
305  blob.write(static_cast<byte>(0));
306  blob.write(BINARY_FORMAT_VERSION);
307  blob.write(static_cast<int32_t>(compressed_dictionary.size()));
308  blob.write(compressed_dictionary);
309  blob.write(static_cast<int32_t>(metrics_count));
310  blob.write(compressed_metrics);
311 
312  return std::move(blob.content());
313 }
314 
315 byte
316 metrics_compressor::calculate_descriptor_mask(
317  const metric_descriptor& descriptor)
318 {
319  byte mask = 0;
320 
321  if (last_descriptor_) {
322  if (descriptor.prefix() == last_descriptor_->prefix()) {
323  mask |= MASK_PREFIX;
324  }
325 
326  if (descriptor.metric() == last_descriptor_->metric()) {
327  mask |= MASK_METRIC;
328  }
329 
330  if (descriptor.discriminator() == last_descriptor_->discriminator()) {
331  mask |= MASK_DISCRIMINATOR;
332  }
333 
334  if (descriptor.discriminator_value() ==
335  last_descriptor_->discriminator_value()) {
336  mask |= MASK_DISCRIMINATOR_VALUE;
337  }
338 
339  if (descriptor.unit() == last_descriptor_->unit()) {
340  mask |= MASK_UNIT;
341  }
342 
343  // include excludedTargets and tagCount bits for compatibility purposes
344  mask |= MASK_EXCLUDED_TARGETS;
345  mask |= MASK_TAG_COUNT;
346  }
347 
348  return mask;
349 }
350 
351 int32_t
352 metrics_compressor::get_dictionary_id(const boost::optional<std::string>& word)
353 {
354  if (!word) {
355  return NULL_DICTIONARY_ID;
356  }
357 
358  return static_cast<int32_t>(dictionary_.get_dictionary_id(word.get()));
359 }
360 
361 void
362 metrics_compressor::write_descriptor(const metric_descriptor& descriptor)
363 {
364  byte mask = calculate_descriptor_mask(descriptor);
365 
366  metrics_buffer_.write(mask);
367 
368  if ((mask & MASK_PREFIX) == 0) {
369  metrics_buffer_.write(get_dictionary_id(descriptor.prefix()));
370  }
371 
372  if ((mask & MASK_METRIC) == 0) {
373  metrics_buffer_.write(get_dictionary_id(descriptor.metric()));
374  }
375 
376  if ((mask & MASK_DISCRIMINATOR) == 0) {
377  metrics_buffer_.write(get_dictionary_id(descriptor.discriminator()));
378  }
379 
380  if ((mask & MASK_DISCRIMINATOR_VALUE) == 0) {
381  metrics_buffer_.write(static_cast<int32_t>(
382  get_dictionary_id(descriptor.discriminator_value())));
383  }
384 
385  if ((mask & MASK_UNIT) == 0) {
386  metrics_buffer_.write(static_cast<byte>(descriptor.unit()));
387  }
388 
389  if ((mask & MASK_EXCLUDED_TARGETS) == 0) {
390  metrics_buffer_.write(static_cast<byte>(0));
391  }
392 
393  if ((mask & MASK_TAG_COUNT) == 0) {
394  metrics_buffer_.write(static_cast<byte>(0));
395  }
396 
397  last_descriptor_ = descriptor;
398  metrics_count++;
399 }
400 
401 void
402 metrics_compressor::write_dictionary()
403 {
404  std::string last_word{ "" };
405 
406  dictionary_buffer_.write(static_cast<int32_t>(dictionary_.size()));
407 
408  for (const auto& item : dictionary_) {
409  const auto& word = item.first;
410  const auto& id = item.second;
411 
412  auto common_len = find_common_prefix_length(last_word, word);
413 
414  dictionary_buffer_.write(static_cast<int32_t>(id));
415  dictionary_buffer_.write(static_cast<byte>(common_len));
416  dictionary_buffer_.write(static_cast<byte>(word.size() - common_len));
417  dictionary_buffer_.write(word.substr(common_len));
418 
419  last_word = word;
420  }
421 }
422 
423 } // namespace metrics
424 } // namespace impl
425 } // namespace client
426 } // namespace hazelcast