Heartbeat Sensor for Kafka¶
This shows how to create a Heartbeat Sensor Orchestrator to detect new data from Kafka and trigger Databricks Workflows related to them.
Configuration required to create an orchestration task for the kafka source¶
- sensor_source: Set to
kafkain the Heartbeat Control Table to identify this as a Kafka source. - data_format: Set to
kafkato specify the data format for reading Kafka streams. - heartbeat_sensor_db_table: Database table name for the Heartbeat sensor control table (e.g.,
my_database.heartbeat_sensor). - lakehouse_engine_sensor_db_table: Database table name for the lakehouse engine sensors (e.g.,
my_database.lakehouse_engine_sensors). - options: Configuration options for Kafka reading:
readChangeFeed: Set to"true"to enable change data feed reading.
- kafka_configs: Kafka connection and security configurations:
kafka_bootstrap_servers_list: Kafka server endpoints.kafka_ssl_truststore_location: Path to SSL truststore.truststore_pwd_secret_key: Secret key for truststore password.kafka_ssl_keystore_location: Path to SSL keystore.keystore_pwd_secret_key: Secret key for keystore password.
- kafka_secret_scope: Databricks secret scope for Kafka credentials.
- base_checkpoint_location: S3 path for storing checkpoint data (required if
sensor_read_typeisstreaming). - domain: Databricks workflows domain for job triggering.
- token: Databricks workflows token for authentication.
Kafka Data Feed CSV Configuration Entry¶
To check how the entry for a Kafka source should look in the Heartbeat Control Table, check it here.
Additional Requirements for Kafka:
The sensor_id follows a specific naming convention because you can have multiple data
products using the same configuration file with different Kafka configuration values:
- The value for the
sensor_idwill be the Kafka Topic name starting with<product_name:>or any other prefix, example:my_product: my.topic. - How it works? → Heartbeat receives a dictionary containing all kafka configurations by
product, which is passed as
kafka_configsin the ACON. Then it segregates the config based onsensor_idvalue present in the heartbeat control table. Heartbeat will split thesensor_idbased on colon (:) and the first part of it will be considered as product name (in our case,my_product) and the second part of the split string will be the Kafka topic name (in our case,my.topic). Finally, it will make use of the product related kafka config from thekafka_configs.
Code sample of listener and trigger¶
from lakehouse_engine.engine import (
execute_sensor_heartbeat,
trigger_heartbeat_sensor_jobs,
)
# Kafka configurations for the product, we strongly recommend to read these values from a external configuration file.
kafka_configs = {
"my_product": {
"kafka_bootstrap_servers_list": "KAFKA_SERVER",
"kafka_ssl_truststore_location": "TRUSTSTORE_LOCATION",
"truststore_pwd_secret_key": "TRUSTSTORE_PWD",
"kafka_ssl_keystore_location": "KEYSTORE_LOCATION",
"keystore_pwd_secret_key": "KEYSTORE_PWD"
}
}
# Create an ACON dictionary for all kafka source entries.
# This ACON dictionary is useful for passing parameters to heartbeat sensors.
heartbeat_sensor_config_acon = {
"sensor_source": "kafka",
"data_format": "kafka",
"heartbeat_sensor_db_table": "my_database.heartbeat_sensor",
"lakehouse_engine_sensor_db_table": "my_database.lakehouse_engine_sensors",
"options": {
"readChangeFeed": "true",
},
"kafka_configs": kafka_configs,
"kafka_secret_scope": "DB_SECRET_SCOPE",
"base_checkpoint_location": "s3://my_data_product_bucket/checkpoints",
"domain": "DATABRICKS_WORKFLOWS_DOMAIN",
"token": "DATABRICKS_WORKFLOWS_TOKEN",
}
# Execute Heartbeat sensor and trigger jobs which have acquired new data.
execute_sensor_heartbeat(acon=heartbeat_sensor_config_acon)
trigger_heartbeat_sensor_jobs(heartbeat_sensor_config_acon)