I am running the event hub receiver implementation from: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send#create-a-python-script-to-receive-events and I haven't changed anything except the connection strings etc.
After creating hundreds of events, I can see that the receiver created checkpoint folder inside the storage account, but as I run the receiver again I see that it processes the same events.
The files that are created per partitions also are empty.
The storage is provided in the consumerClient:
checkpoint_store = BlobCheckpointStore.from_connection_string("...", "eventhubcontainer") client = EventHubConsumerClient.from_connection_string("...", consumer_group="$Default", eventhub_name="eventhub1", checkpoint_store=checkpoint_store)
Also after reading the event there is method saving the checkpoint:
await partition_context.update_checkpoint(event)
Am I missing something here?
The whole code:
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore async def on_event(partition_context, event): print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id)) await partition_context.update_checkpoint(event) async def main(): checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME") client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store) async with client: await client.receive(on_event=on_event, starting_position="-1") if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
https://stackoverflow.com/questions/66550651/event-hub-checkpoint-data-is-not-saved March 10, 2021 at 12:15AM
没有评论:
发表评论