Stateless Transforms
Stateless transforms are the bread and butter of a data pipeline: they transform the input into the correct shape that is required by further, more complex transforms. The key feature of these transforms is that they do not have side-effects and they treat each item in isolation.
map
Mapping is the simplest kind of stateless transformation. It simply applies a function to the input item, and passes the output to the next stage.
StreamStage<String> names = stage.map(name -> name.toLowerCase());
filter
Similar to map
, the filter
is a stateless operator that applies a
predicate to the input to decide whether to pass it to the output.
BatchStage<String> names = stage.filter(name -> !name.isEmpty());
flatMap
flatMap
is equivalent to map
, with the difference that instead of
one output item you can have arbitrary number of output items per input
item. The output type is a Traverser
, which is a Jet type similar to
an Iterator
. For example, the code below will split a sentence into
individual items consisting of words:
StreamStage<String> words = stage.flatMap(
sentence -> Traversers.traverseArray(sentence.split("\\W+"))
);
merge
This transform merges the contents of two streams into one. The item type in the right-hand stage must be the same or a subtype of the one in the left-hand stage. The items from both sides will be interleaved in arbitrary order.
StreamStage<Trade> tradesNewYork = pipeline
.readFrom(KafkaSources.kafka(.., "nyc"))
.withoutTimestamps();
StreamStage<Trade> tradesTokyo = pipeline
.readFrom(KafkaSources.kafka(.., "nyc"))
.withoutTimestamps();
StreamStage<Trade> tradesNyAndTokyo = tradesNewYork.merge(tradesTokyo);
mapUsingIMap
This transform looks up each incoming item from the corresponding IMap and the result of the lookup is combined with the input item.
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
StreamStage<OrderDetails> details = orders.mapUsingIMap("products",
order -> order.getProductId(),
(order, product) -> new OrderDetails(order, product));
The above code can be thought of as equivalent to below, where the input
is of type Order
public void getOrderDetails(Order order) {
IMap<String, ProductDetails> map = jet.getMap("products");
ProductDetails product = map.get(order.getProductId());
return new OrderDetails(order, product);
}
See Joining Static Data to a Stream for a tutorial using this operator.
mapUsingReplicatedMap
This transform is equivalent to mapUsingIMap with the
only difference that a ReplicatedMap is used instead
of an IMap
.
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
StreamStage<OrderDetails> details = orders.mapUsingReplicatedMap("products",
order -> order.getProductId(),
(order, product) -> new OrderDetails(order, product));
With a
ReplicatedMap
, as opposed to a standardIMap
, every lookup is local. The downside is that the data is replicated to all the nodes, consuming more memory in the cluster.
mapUsingService
This transform takes an input and performs a mapping using a service object. Examples are an external HTTP-based service or some library which is loaded and initialized during runtime (such as a machine learning model).
The service itself is defined through a ServiceFactory
object. The
main difference between this operator and a simple map
is that the
service is initialized once per job. This is what makes it useful for
calling out to heavy-weight objects which are expensive to initialize
(such as HTTP connections).
Let's imagine an HTTP service which returns details for a product and
that we have wrapped this service in a ProductService
class:
interface ProductService {
ProductDetails getDetails(int productId);
}
We can then create a shared service factory as follows:
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
ServiceFactory<?, ProductService> productService = ServiceFactories
.sharedService(ctx -> new ProductService(url))
.toNonCooperative();
"Shared" means that the service is thread-safe and can be called from multiple-threads, so only Jet will create just one instance on each node and share it among the parallel tasklets.
We also declared the service as "non-cooperative" because it makes blocking HTTP calls. Failing to do this would have severe consequences for the performance of not just your pipeline, but all the jobs running on the Jet cluster.
We can then perform a lookup on this service for each incoming order:
StreamStage<OrderDetails> details = orders.mapUsingService(productService,
(service, order) -> {
ProductDetails details = service.getDetails(order.getProductId);
return new OrderDetails(order, details);
}
);
mapUsingServiceAsync
This transform is identical to mapUsingService with
one important distinction: the service in this case supports
asynchronous calls, which are compatible with cooperative concurrency
and don't need extra threads. It also means that we can have multiple
requests in flight at the same time to maximize throughput. Instead of
the mapped value, this transform expects the user to supply a
CompletableFuture<T>
as the return value, which will be completed at
some later time.
For example, if we extend the previous ProductService
as follows:
interface ProductService {
ProductDetails getDetails(int productId);
CompletableFuture<ProductDetails> getDetailsAsync(int productId);
}
We still create the shared service factory as before:
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
ServiceFactory<?, ProductService> productService = ServiceFactories
.sharedService(ctx -> new ProductService(url));
The lookup instead becomes async, and note that the transform also expects you to return
StreamStage<OrderDetails> details = orders.mapUsingServiceAsync(productService,
(service, order) -> {
CompletableFuture<ProductDetails> f = service.getDetailsAsync(order.getProductId);
return f.thenApply(details -> new OrderDetails(order, details));
}
);
The main advantage of using async communication is that we can have many invocations to the service in-flight at the same time which will result in better throughput.
mapUsingServiceAsyncBatched
This variant is very similar to the previous one, but instead of sending
one request at a time, we can send in so-called "smart batches" (for a
more in-depth look at the internals of Jet, see the Execution
Engine section). Jet will
automatically group items as they come, and allows to send requests in
batches. This can be very efficient for example for a remote service,
where instead of one roundtrip per request, you can send them in groups
to maximize throughput. If we would extend our ProductService
as
follows:
interface ProductService {
ProductDetails getDetails(int productId);
CompletableFuture<ProductDetails> getDetailsAsync(int productId);
CompletableFuture<List<ProductDetails>> getAllDetailsAsync(List<Integer> productIds);
}
We can then rewrite the transform as:
StreamStage<OrderDetails> details = orders.mapUsingServiceAsyncBatched(productService,
(service, orderList) -> {
List<Integer> productIds = orderList
.stream()
.map(o -> o.getProductId())
.collect(Collectors.toList())
CompletableFuture<List<ProductDetails>> f = service
.getDetailsAsync(order.getProductId());
return f.thenApply(productDetailsList -> {
List<OrderDetails> orderDetailsList = new ArrayList<>();
for (int i = 0; i < orderList; i++) {
new OrderDetails(order.get(i), productDetailsList.get(i)))
}
};
});
);
As you can see, there is some more code to write to combine the results back, but this should give better throughput given the service is able to efficient batching.
mapUsingPython
Hazelcast Jet can call Python code to perform a mapping step in the
pipeline. The prerequisite is that the Jet servers are Linux or Mac with
Python installed and that the hazelcast-jet-python
module is deployed
on the classpath, through being present in the lib
folder. Jet
supports Python versions 3.5-3.7.
You are expected to define a function, conventionally named
transform_list(input_list)
, that takes a list of strings and returns a
list of strings whose items match positionally one-to-one with the input
list. Jet will call this function with batches of items received by the
Python mapping stage. If necessary, you can also use a custom name for
the transforming function.
Internally Jet launches Python processes that execute your function. It
launches as many of them as requested by the localParallelism
setting
on the Python pipeline stage. It prepares a local virtual Python
environment for the processes to run in and they communicate with it
over the loopback network interface, using a bidirectional streaming
gRPC call.
If you have some simple Python work that fits into a single file, you
can tell Jet just the name of that file, which is assumed to be a Python
module file that declares transform_list
:
StreamStage<String> sourceStage = sourceStage();
StreamStage<String> pythonMapped = sourceStage.apply(PythonTransforms.mapUsingPython(
new PythonServiceConfig().setHandlerFile("path/to/handler.py")));
And here's an example of handler.py
:
def transform_list(input_list):
return ['reply-' + item for item in input_list]
If you have an entire Python project that you want to use from Jet, just
name its base directory and Jet will upload all of it (recursively) to
the cluster as a part of the submitted job. In this case you must also
name the Python module that declares transform_list
:
StreamStage<String> sourceStage = sourceStage();
StreamStage<String> pythonMapped = sourceStage.apply(PythonTransforms.mapUsingPython(
new PythonServiceConfig().setBaseDir("path/to/python_project")
.setHandlerModule("jet_handler"))
);
Normally your Python code will make use of non-standard libraries. Jet
recognizes the conventional requirements.txt
file in your project's
base directory and will ensure all the listed requirements are
satisfied.
Finally, Jet also recognizes bash scripts init.sh
and cleanup.sh
. It
will run those during the initialization and cleanup phases of the job.
Regardless of the parallelism of the Python stage, these scripts run
just once per job, and they run in the context of an already activated
virtual environment.
One issue with making requirements.txt
work is that in many production
back-end environments the public internet is not available. To work
around this you can pre-install all the requirements to the global (or
user-local) Python environment on all Jet servers. You can also take
full control by writing your own logic in init.sh
that installs the
dependencies to the local virtual environment. For example, you can make
use of pip --find_links
.
For a full tutorial, see Apply a Python Function.
hashJoin
hashJoin
is a type of join where you have two or more inputs where all
but one of the inputs must be small enough to fit in memory. You can
consider a primary input which is accompanied by one or more
side inputs which are small enough to fit in memory. The side inputs
are joined to the primary input, which can be either a batch or
streaming stage. The side inputs must be batch stages.
StreamStage<Order> orders = pipeline
.readFrom(orderSource())
.withoutTimestamps();
BatchStage<ProductDetails> productDetails = pipeline
.readFrom(productDetailsSource());
StreamStage<OrderDetails> joined = orders.hashJoin(productDetails,
onKeys(Order::productId, ProductDetails::productId),
(order, product) -> new OrderDetails(order, product)
);
The last argument to hashJoin
is a function that gets the input and
the enriching item. Note that by default Jet does an outer join: if the
enriching stream lacks a given key, the corresponding function parameter
will be null
. You can request an inner join as well:
StreamStage<OrderDetails> joined = orders.innerHashJoin(productDetails,
onKeys(Order::productId, ProductDetails::productId),
(order, product) -> new OrderDetails(order, product)
);
In this case the product
argument is never null
and if a given key
is missing, the input Order
item is filtered out.
Jet also supports hash-joining with more streams at once through
hashJoin2
and the hashJoinBuilder
. Refer to their documentation for
more details.