To higher understand the RealtimeMLPipeline
interface, we are able to take a better take a look at a Real-time Features use case for example.
To define a RealtimeMLPipeline
which encodes a real-time feature, a developer provides metadata reminiscent of a feature name and version, a question to compute it, and instantiates a RealtimeMLPipeline
Python object. That Python object is portable across all environments that we support—local test environment, notebook, staging, and production.
The next code block is an example which defines real-time feature computation using the RealtimeMLPipeline
interface:
feature_sql = """
SELECT driver_id AS entity_id, window_start AS rowtime, count_accepted / count_total as feature_value
FROM (
SELECT driver_id,
window_start,
CAST(sum(case when status = 'accepted' then 1.0 else 0.0 end) AS DOUBLE) as count_accepted,
CAST(count(*) AS DOUBLE) as count_total
FROM TABLE(
TUMBLE(TABLE driver_notification_result, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)
)
GROUP BY driver_id, window_start, window_end
)
"""feature_sink = DsFeaturesSink()
feature_definition = FeatureDefinition('driver_accept_proportion_10m', 'some_feature_group', Entity.DRIVER, 'float')
pipe = RealtimeMLPipeline()
pipe
.query(feature_sql)
.register_feature(feature_definition)
.add_sink(feature_sink)
In this instance, a feature called driver_accept_proporition_10m
is defined, which represents the proportion of notifications a driver accepts per ten minute tumbling window.
The RealtimeMLPipline
object constructed could be executed in a Jupyter notebook, an area test environment, and against staging and production Flink clusters. In a notebook and on an area test environment, the pipeline runs against an ad hoc Flink cluster and the output is written to the local file system for validation.
In staging and production, the pipeline runs against a multi-tentant production grade and scale Flink cluster and outputs computed features to Kafka which in turn delivers them to our Feature Storage infrastructure.
Note that within the code block we defined a RealtimeMLPipline
as pipe
. To run it in various environments, we’d simply have to execute the next code:
pipe.run()
You’ll be able to imagine that pipe
object being serialized and loaded in numerous environments.
trapanese hip hop mix