MongoDB 4.4.4 change stream returns all document updates rather than filtered condition matched one?

I am looking for notifications of a single document’s ‘update’ operation within a collection, supposedly when that particular document gets updated MongoDB would notify a ‘watch’ registered .net client, but the reality is MongoDB returns all documents with ‘update’ operations in the collection, no matter how the “match” condition of filter sets.

Can anyone with Change Stream experience help? Is it the nature/design of MongoDB change stream?

Below is the test code pieces of .net C# client,

The class:

public class UserInfo
{
        [BsonId, BsonRepresentation(BsonType.ObjectId)]
        public string Id { get; set; }

        [BsonElement("UserName", Order = 1), BsonRepresentation(BsonType.String)]
        public string UserName { get; set; }

        [BsonElement("Password", Order = 2), BsonRepresentation(BsonType.String)]
        public string Password { get; set; }

        [BsonElement("LastName", Order = 3), BsonRepresentation(BsonType.String)]
        public string LastName { get; set; }

        [BsonElement("FirstName", Order = 4), BsonRepresentation(BsonType.String)]
        public string FirstName { get; set; }

        [BsonElement("Email", Order = 5), BsonRepresentation(BsonType.String)]
        public string Email { get; set; }
}

Pipeline of filter conditions:

collection = myDB.GetCollection<UserInfo>("Users");
var options = new ChangeStreamOptions
            {
                FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
            };
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<UserInfo>>();
pipeline.Match(g => g.OperationType == ChangeStreamOperationType.Update)
        .Match(o => o.FullDocument.UserName.Contains("Alice"))
        .Match(t => t.UpdateDescription.UpdatedFields.ToString().Contains(nameof(UserInfo.Password)));

Task watchTask = WatchCollection();

Change event process routine:

private static async Task WatchCollection()
{
            var cursor =  await collection.WatchAsync(pipeline, options);
            Debug.WriteLine("ChangeStream started.");
            
            await cursor.ForEachAsync(change =>
            {
                Debug.WriteLine("Matched UserName: " + change.FullDocument.UserName);
            });
}

From the above routine, change events aren’t working according to the filter conditions, every time this routine gets triggered, ‘change.FullDocument.UserName’ actually prints out every document in the collection that gets ‘update’ operation, which is quite weird.

Answer

i was able to get it to work with the following:

var options = new ChangeStreamOptions
{
    FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
    BatchSize = 1
};

var filter = Builders<ChangeStreamDocument<UserInfo>>
    .Filter.Where(x =>
        x.OperationType == ChangeStreamOperationType.Update &&
        x.FullDocument.UserName.Contains("Alice"));

filter &= Builders<ChangeStreamDocument<UserInfo>>.Filter.Exists("FullDocument.Password");

var pipeline = new IPipelineStageDefinition[]
{
    PipelineStageDefinitionBuilder.Match(filter)
};

using (var cursor = await collection.WatchAsync<ChangeStreamDocument<UserInfo>>(pipeline, options))
{
    while (await cursor.MoveNextAsync())
    {
        foreach (var info in cursor.Current)
        {
            Console.WriteLine("Updated: " + info.FullDocument.UserName);
        }
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *