This repository has been archived by the owner on Feb 16, 2024. It is now read-only.
forked from heroku/salesforce-bulk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_salesforce_bulk.py
147 lines (109 loc) · 4.83 KB
/
test_salesforce_bulk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import re
import unittest
import salesforce_oauth_request
from salesforce_bulk import SalesforceBulk
SALESFORCE_API_VERSION = "51.0"
class SalesforceBulkTest(unittest.TestCase):
def __init__(self, testName, endpoint, sessionId):
super(SalesforceBulkTest, self).__init__(testName)
self.endpoint = endpoint
self.sessionId = sessionId
def setUp(self):
self.jobs = []
def tearDown(self):
if hasattr(self, 'bulk'):
for job_id in self.jobs:
print "Closing job: %s" % job_id
self.bulk.close_job(job_id)
def test_raw_query(self):
bulk = SalesforceBulk(SALESFORCE_API_VERSION, self.sessionId, self.endpoint)
self.bulk = bulk
job_id = bulk.create_query_job("Contact")
self.jobs.append(job_id)
self.assertIsNotNone(re.match("\w+", job_id))
batch_id = bulk.query(job_id, "Select Id,Name,Email from Contact Limit 1000")
self.assertIsNotNone(re.match("\w+", batch_id))
while not bulk.is_batch_done(job_id, batch_id):
print "Job not done yet..."
print bulk.batch_status(job_id, batch_id)
time.sleep(2)
self.results = ""
def save_results(tfile, **kwargs):
print "in save results"
self.results = tfile.read()
flag = bulk.get_batch_results(job_id, batch_id, callback = save_results)
self.assertTrue(flag)
self.assertTrue(len(self.results) > 0)
self.assertIn('"', self.results)
def test_csv_query(self):
bulk = SalesforceBulk(SALESFORCE_API_VERSION, self.sessionId, self.endpoint)
self.bulk = bulk
job_id = bulk.create_query_job("Account")
self.jobs.append(job_id)
self.assertIsNotNone(re.match("\w+", job_id))
batch_id = bulk.query(job_id, "Select Id,Name,Description from Account Limit 10000")
self.assertIsNotNone(re.match("\w+", batch_id))
bulk.wait_for_batch(job_id, batch_id, timeout=120)
self.results = None
def save_results1(rows, **kwargs):
self.results = rows
flag = bulk.get_batch_results(job_id, batch_id, callback = save_results1, parse_csv=True)
self.assertTrue(flag)
results = self.results
self.assertTrue(len(results) > 0)
self.assertTrue(isinstance(results,list))
self.assertEqual(results[0], ['Id','Name','Description'])
self.assertTrue(len(results) > 3)
self.results = None
self.callback_count = 0
def save_results2(rows, **kwargs):
self.results = rows
print rows
self.callback_count += 1
batch = len(results) / 3
self.callback_count = 0
flag = bulk.get_batch_results(job_id, batch_id, callback = save_results2, parse_csv=True, batch_size=batch)
self.assertTrue(self.callback_count >= 3)
def test_csv_upload(self):
bulk = SalesforceBulk(SALESFORCE_API_VERSION, self.sessionId, self.endpoint)
self.bulk = bulk
job_id = bulk.create_insert_job("Contact")
self.jobs.append(job_id)
self.assertIsNotNone(re.match("\w+", job_id))
batch_ids = []
content = open("example.csv").read()
for i in range(5):
batch_id = bulk.query(job_id, content)
self.assertIsNotNone(re.match("\w+", batch_id))
batch_ids.append(batch_id)
for batch_id in batch_ids:
bulk.wait_for_batch(job_id, batch_id, timeout=120)
self.results = None
def save_results1(rows, failed, remaining):
self.results = rows
for batch_id in batch_ids:
flag = bulk.get_upload_results(job_id, batch_id, callback = save_results1)
self.assertTrue(flag)
results = self.results
self.assertTrue(len(results) > 0)
self.assertTrue(isinstance(results,list))
self.assertEqual(results[0], UploadResult('Id','Success','Created','Error'))
self.assertEqual(len(results), 3)
self.results = None
self.callback_count = 0
def save_results2(rows, failed, remaining):
self.results = rows
self.callback_count += 1
batch = len(results) / 3
self.callback_count = 0
flag = bulk.get_upload_results(job_id, batch_id, callback = save_results2, batch_size=batch)
self.assertTrue(self.callback_count >= 3)
if __name__ == '__main__':
username = raw_input("Salesforce username: ")
password = raw_input("Salesforce password: ")
login = salesforce_oauth_request.login(username=username, password=password, cache_session=True)
endpoint = login['endpoint']
sessionId = login['access_token']
suite = unittest.TestSuite()
suite.addTest(SalesforceBulkTest("test_csv_upload", endpoint, sessionId))
unittest.TextTestRunner().run(suite)