Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Salesforce bulk api #24473

Merged
merged 27 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b8b818f
add support for Salesforce bulk api
ishiis Jun 15, 2022
c187cd4
fix a format
ishiis Jun 16, 2022
61d194b
add salesforce_bulk to provider.yaml
ishiis Jun 16, 2022
abd6144
add support for Salesforce bulk api
ishiis Jun 16, 2022
b2a7e1d
fix a code format and typo
ishiis Jun 16, 2022
86a8665
fix a typo
ishiis Jun 20, 2022
41239b1
move the validation to else clause
ishiis Jun 20, 2022
b119a45
add description
ishiis Jun 20, 2022
bca13b7
fix task_id for ci
ishiis Jun 21, 2022
8ddd945
move the validation to constructor and add validation for object_name
ishiis Jun 21, 2022
c2489cb
fix task_id for ci
ishiis Jun 21, 2022
afdf481
remove the return empty dict
ishiis Jun 21, 2022
f59daec
add hard_delete operation
ishiis Jun 21, 2022
fa0d0c2
add return statement
ishiis Jun 21, 2022
dc081cf
rename salesforce_bulk to bulk
ishiis Jun 22, 2022
4c16723
change AirflowException to ValueError
ishiis Jun 22, 2022
806f509
add :dedent: param for rendering the code snippet
ishiis Jun 22, 2022
61b39c0
remove unittest.TestCase
ishiis Jun 22, 2022
fe70ea8
fix a typo
ishiis Jun 25, 2022
7c502a9
Merge branch 'main' into add-salesforce-bulk-operator
ishiis Jun 25, 2022
f0fb5c3
Merge branch 'main' into add-salesforce-bulk-operator
ishiis Jun 27, 2022
88ff186
fix conflicts
ishiis Jul 4, 2022
c7cf1fc
change to return None if nothing to push
ishiis Jul 4, 2022
620f429
change to return None if nothing to push
ishiis Jul 4, 2022
0364bf6
remove return type
ishiis Jul 4, 2022
3e095bb
fix document
ishiis Jul 4, 2022
2a507bb
fix type of result
ishiis Jul 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions airflow/providers/salesforce/operators/salesforce_bulk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Licensed to the Apache Software Foundation (ASF) under one
ishiis marked this conversation as resolved.
Show resolved Hide resolved
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class SalesforceBulkOperator(BaseOperator):
ishiis marked this conversation as resolved.
Show resolved Hide resolved
"""
Execute a Salesforce Bulk API and pushes results to xcom.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SalesforceBulkOperator`

:param operation: Bulk operation to be performed
:param object_name: The name of the Salesforce object
:param payload: list of dict to be passed as a batch
:param external_id_field: unique identifier field for upsert operations
:param batch_size: number of records to assign for each batch in the job
:param use_serial: Process batches in serial mode
:param salesforce_conn_id: The :ref:`Salesforce Connection id <howto/connection:SalesforceHook>`.
"""

def __init__(
self,
*,
operation: str,
object_name: str,
payload: list,
external_id_field: str = 'Id',
batch_size: int = 10000,
use_serial: bool = False,
salesforce_conn_id: str = 'salesforce_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.operation = operation
self.object_name = object_name
self.payload = payload
self.external_id_field = external_id_field
self.batch_size = batch_size
self.use_serial = use_serial
self.salesforce_conn_id = salesforce_conn_id

def execute(self, context: 'Context') -> dict:
"""
Makes an HTTP request to Salesforce Bulk API.

:param context: The task context during execution.
:return: API response if do_xcom_push is True
:rtype: dict
"""
available_operations = ['insert', 'update', 'upsert', 'delete']
if self.operation not in available_operations:
raise AirflowException(f'Operation not found! Available operations are {available_operations}')

sf_hook = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
conn = sf_hook.get_conn()

if self.operation == 'insert':
result = conn.bulk.__getattr__(self.object_name).insert(
data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial
)
elif self.operation == 'update':
result = conn.bulk.__getattr__(self.object_name).update(
data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial
)
elif self.operation == 'upsert':
result = conn.bulk.__getattr__(self.object_name).upsert(
data=self.payload,
external_id_field=self.external_id_field,
batch_size=self.batch_size,
use_serial=self.use_serial,
)
else:
result = conn.bulk.__getattr__(self.object_name).delete(
data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial
)

ishiis marked this conversation as resolved.
Show resolved Hide resolved
if self.do_xcom_push:
return result
else:
return {}
ishiis marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions airflow/providers/salesforce/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ integrations:
external-doc-url: https://www.salesforce.com/
how-to-guide:
- /docs/apache-airflow-providers-salesforce/operators/salesforce_apex_rest.rst
- /docs/apache-airflow-providers-salesforce/operators/salesforce_bulk.rst
logo: /integration-logos/salesforce/Salesforce.png
tags: [service]

operators:
- integration-name: Salesforce
python-modules:
- airflow.providers.salesforce.operators.salesforce_apex_rest
- airflow.providers.salesforce.operators.salesforce_bulk
- airflow.providers.salesforce.operators.tableau_refresh_workbook

sensors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ Salesforce Operators
:maxdepth: 1

salesforce_apex_rest
salesforce_bulk
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

.. _howto/operator:SalesforceBulkOperator:


SalesforceBulkOperator
======================

Use the :class:`~airflow.providers.salesforce.operators.salesforce_bulk.SalesforceBulkOperator` to execute Bulk API.

Using the Operator
^^^^^^^^^^^^^^^^^^

You can use this operator to access Bulk Insert API:

.. exampleinclude:: /../../tests/system/providers/salesforce/example_salesforce_bulk.py
:language: python
ishiis marked this conversation as resolved.
Show resolved Hide resolved
:start-after: [START howto_salesforce_bulk_insert_operator]
:end-before: [END howto_salesforce_bulk_insert_operator]

You can use this operator to access Bulk Update API:

.. exampleinclude:: /../../tests/system/providers/salesforce/example_salesforce_bulk.py
:language: python
ishiis marked this conversation as resolved.
Show resolved Hide resolved
:start-after: [START howto_salesforce_bulk_update_operator]
:end-before: [END howto_salesforce_bulk_update_operator]

You can use this operator to access Bulk Upsert API:

.. exampleinclude:: /../../tests/system/providers/salesforce/example_salesforce_bulk.py
:language: python
ishiis marked this conversation as resolved.
Show resolved Hide resolved
:start-after: [START howto_salesforce_bulk_upsert_operator]
:end-before: [END howto_salesforce_bulk_upsert_operator]

You can use this operator to access Bulk Delete API:

.. exampleinclude:: /../../tests/system/providers/salesforce/example_salesforce_bulk.py
:language: python
ishiis marked this conversation as resolved.
Show resolved Hide resolved
:start-after: [START howto_salesforce_bulk_delete_operator]
:end-before: [END howto_salesforce_bulk_delete_operator]
179 changes: 179 additions & 0 deletions tests/providers/salesforce/operators/test_salesforce_bulk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
from unittest.mock import Mock, patch

import pytest

from airflow.exceptions import AirflowException
from airflow.providers.salesforce.operators.salesforce_bulk import SalesforceBulkOperator


class TestSalesforceBulkOperator(unittest.TestCase):
ishiis marked this conversation as resolved.
Show resolved Hide resolved
"""
Test class for SalesforceBulkOperator
"""

def test_execute_missing_operation(self):
"""
Test execute missing operation
"""
operator = SalesforceBulkOperator(
task_id='missing_operation',
operation='operation',
object_name='Account',
payload=[],
)

with pytest.raises(AirflowException):
operator.execute({})

@patch('airflow.providers.salesforce.operators.salesforce_apex_rest.SalesforceHook.get_conn')
def test_execute_salesforce_bulk_insert(self, mock_get_conn):
"""
Test execute bulk insert
"""

operation = 'insert'
object_name = 'Account'
payload = [
{'Name': 'account1'},
{'Name': 'account2'},
]
batch_size = 10000
use_serial = True

mock_get_conn.return_value.bulk.__getattr__(object_name).insert = Mock()
operator = SalesforceBulkOperator(
task_id='bulk_insert',
operation=operation,
object_name=object_name,
payload=payload,
batch_size=batch_size,
use_serial=use_serial,
)

operator.execute(context={})

mock_get_conn.return_value.bulk.__getattr__(object_name).insert.assert_called_once_with(
data=payload,
batch_size=batch_size,
use_serial=use_serial,
)

@patch('airflow.providers.salesforce.operators.salesforce_apex_rest.SalesforceHook.get_conn')
def test_execute_salesforce_bulk_update(self, mock_get_conn):
"""
Test execute bulk update
"""

operation = 'update'
object_name = 'Account'
payload = [
{'Id': '000000000000000AAA', 'Name': 'account1'},
{'Id': '000000000000000BBB', 'Name': 'account2'},
]
batch_size = 10000
use_serial = True

mock_get_conn.return_value.bulk.__getattr__(object_name).update = Mock()
operator = SalesforceBulkOperator(
task_id='bulk_update',
operation=operation,
object_name=object_name,
payload=payload,
batch_size=batch_size,
use_serial=use_serial,
)

operator.execute(context={})

mock_get_conn.return_value.bulk.__getattr__(object_name).update.assert_called_once_with(
data=payload,
batch_size=batch_size,
use_serial=use_serial,
)

@patch('airflow.providers.salesforce.operators.salesforce_apex_rest.SalesforceHook.get_conn')
def test_execute_salesforce_bulk_upsert(self, mock_get_conn):
"""
Test execute bulk upsert
"""

operation = 'upsert'
object_name = 'Account'
payload = [
{'Id': '000000000000000AAA', 'Name': 'account1'},
{'Name': 'account2'},
]
external_id_field = 'Id'
batch_size = 10000
use_serial = True

mock_get_conn.return_value.bulk.__getattr__(object_name).upsert = Mock()
operator = SalesforceBulkOperator(
task_id='bulk_upsert',
operation=operation,
object_name=object_name,
payload=payload,
external_id_field=external_id_field,
batch_size=batch_size,
use_serial=use_serial,
)

operator.execute(context={})

mock_get_conn.return_value.bulk.__getattr__(object_name).upsert.assert_called_once_with(
data=payload,
external_id_field=external_id_field,
batch_size=batch_size,
use_serial=use_serial,
)

@patch('airflow.providers.salesforce.operators.salesforce_apex_rest.SalesforceHook.get_conn')
def test_execute_salesforce_bulk_delete(self, mock_get_conn):
"""
Test execute bulk delete
"""

operation = 'delete'
object_name = 'Account'
payload = [
{'Id': '000000000000000AAA'},
{'Id': '000000000000000BBB'},
]
batch_size = 10000
use_serial = True

mock_get_conn.return_value.bulk.__getattr__(object_name).delete = Mock()
operator = SalesforceBulkOperator(
task_id='bulk_delete',
operation=operation,
object_name=object_name,
payload=payload,
batch_size=batch_size,
use_serial=use_serial,
)

operator.execute(context={})

mock_get_conn.return_value.bulk.__getattr__(object_name).delete.assert_called_once_with(
data=payload,
batch_size=batch_size,
use_serial=use_serial,
)
Loading