Change feed
The Cosmos DB change feed is a persistent record of all committed inserts and updates, in the order that they occurred. These changes can be listened to by consumers and processed asynchronously.
Delete operations are not logged. To get around this you can "soft" delete and set a TTL on the item to eventually actually delete it. This allows the change feed to be aware of the deletion.
Push or pull
Change feed supports both push and pull models. The push model is recommended best practice for convenience. There are some situations where the pull model may be preferred:
- Reading from a particular partition
- Throttling incoming changes
- Non real-time work (data migration)
Using push
There are two ways to consume the change feed via push:
Change feed processor libraryFunctions: Cosmos DB triggers
The Azure functions Cosmos DB trigger uses the change feed processor library under the hood, so they are actually the same, the Function trigger is just a convenience wrapper.
Change feed processor library
Since the function trigger uses this library under the hood, we'll start with that. The change feed processor is made up of four components:
Monitored container
The container that is watched for changes (inserts/updates)Lease container
State storage, coordinates processing across multiple worker instances in parallelCompute instance
Compute power is required to listen for changes in the feed, this might be one of:VMK8s podApp service instancePhysical machine
Delegate
What you want to be done with consumed changes from the feed
Setup
var processor = cosmosClient
.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(
processorName: "changeFeedSample",
onChangesDelegate: HandleChangesAsync
)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
await processor.StartAsync();
processorNameis a descriptive name for what the feed processor is trying to achieveonChangesDelegateis a function with the delegate implementationWithInstanceName()defines a unique name for this worker instanceWithLeaseContainer()defines the lease container for this change feed
Function trigger
The following code snippet is how a change feed processor can be configured more simply using an Azure function trigger:
[Function("CosmosTrigger")]
public void Run([CosmosDBTrigger(
databaseName: "ToDoItems",
containerName:"TriggerItems",
Connection = "CosmosDBConnection",
LeaseContainerName = "leases",
CreateLeaseContainerIfNotExists = true)] IReadOnlyList<ToDoItem> todoItems,
FunctionContext context)
{
if (todoItems is not null && todoItems.Any())
{
foreach (var doc in todoItems)
{
_logger.LogInformation("ToDoItem: {desc}", doc.Description);
}
}
}
- Listens for updates to
ToDoItems::TriggerItemscontainer