diff --git a/airflow/providers/mongo/hooks/mongo.py b/airflow/providers/mongo/hooks/mongo.py index fca855a51d98d..928618a9dc4c6 100644 --- a/airflow/providers/mongo/hooks/mongo.py +++ b/airflow/providers/mongo/hooks/mongo.py @@ -366,3 +366,27 @@ def delete_many( collection = self.get_collection(mongo_collection, mongo_db=mongo_db) return collection.delete_many(filter_doc, **kwargs) + + def distinct( + self, + mongo_collection: str, + distinct_key: str, + filter_doc: dict | None = None, + mongo_db: str | None = None, + **kwargs, + ) -> list[Any]: + """ + Returns a list of distinct values for the given key across a collection. + + https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.distinct + + :param mongo_collection: The name of the collection to perform distinct on. + :param distinct_key: The field to return distinct values from. + :param filter_doc: A query that matches the documents get distinct values from. + Can be omitted; then will cover the entire collection. + :param mongo_db: The name of the database to use. + Can be omitted; then the database from the connection string is used. + """ + collection = self.get_collection(mongo_collection, mongo_db=mongo_db) + + return collection.distinct(distinct_key, filter=filter_doc, **kwargs) diff --git a/tests/providers/mongo/hooks/test_mongo.py b/tests/providers/mongo/hooks/test_mongo.py index 4d46a613e755d..19aa4928fc092 100644 --- a/tests/providers/mongo/hooks/test_mongo.py +++ b/tests/providers/mongo/hooks/test_mongo.py @@ -303,6 +303,32 @@ def test_aggregate(self): results = self.hook.aggregate(collection, aggregate_query) assert len(list(results)) == 2 + def test_distinct(self): + collection = mongomock.MongoClient().db.collection + objs = [ + {"test_id": "1", "test_status": "success"}, + {"test_id": "2", "test_status": "failure"}, + {"test_id": "3", "test_status": "success"}, + ] + + collection.insert_many(objs) + + results = self.hook.distinct(collection, "test_status") + assert len(results) == 2 + + def test_distinct_with_filter(self): + collection = mongomock.MongoClient().db.collection + objs = [ + {"test_id": "1", "test_status": "success"}, + {"test_id": "2", "test_status": "failure"}, + {"test_id": "3", "test_status": "success"}, + ] + + collection.insert_many(objs) + + results = self.hook.distinct(collection, "test_id", {"test_status": "failure"}) + assert len(results) == 1 + def test_context_manager(): with MongoHook(conn_id="mongo_default", mongo_db="default") as ctx_hook: