Building upon sending and receiving of messages in Data Ingestion with Azure Event Hubs using Python let’s explore how to make them meaningful using structured data.
With the End in Mind
Whatever we believe we know to be true, typically goes out the door as our products hit the real world. If our extraction and transformation steps are tightly coupled, then it is difficult for us to effectively reprocess the data.
Preserve the Original Data
Whenever possible, preserving the original data allows it to be reprocessed without having to recapture the entire data set! Resist the urge to transform your data at this point. This allows us to decouple our architecture.
Meta-Data
Pulling out some meta-data, such as a record identity, is necessary, but we must make sure we have a unique delimiter to tokenize the message. I recently used |~|
as one for my ingestion pipeline. In addition, a record type will help us implement a command design pattern for processing each individual event. Additional attributes may be added to assist with the transformation of the data, such as a profile name if the message is related.
Building a Structured Message in Python
Given our meta-data attributes:
- record identity =>
id
- record type =>
type
- profile name =>
profile
Given our unique delimiter:
|~|
This is a short Python method that asserts messages will be sent in a format that is easy for us to transform into meaningful inputs to the rest of our system.
def build_message(profile: str, message_type: str,
event_id: str, message: str,
*, delimiter: str = data_delimiter):
structured_message = \
f'profile:{profile}{delimiter}' \
f'type:{message_type}{delimiter}' \
f'id:{event_id}{delimiter}' \
f'message:{message}'
return structured_message
# => profile:dev3l|~|type:tweet|~|id:1|~|message:hello, world
Conclusion
With a well-structured message, we can easily process our events in realtime OR the entire event stream when the time comes! In our next adventure, we will manually process some of our messages and store them in a persistent store, locally in Mongo DB and later in the clouds using Azure Cosmos DB.
暂无评论内容