forked from salesforce/CodeRL
-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate.py
220 lines (176 loc) · 8.63 KB
/
generate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#
# Copyright (c) 2022, salesforce.com, inc.
# All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
# For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
#
import json
import os
import pprint
import torch
import pdb
import glob
from tqdm import tqdm
import pickle as pkl
import numpy as np
from collections import Counter
from transformers import RobertaTokenizer, T5ForConditionalGeneration
import datasets.utils as dsutils
def generate_prompt(args, test_case_path, prompt_path, solutions_path, tokenizer,
starter_path=None):
_input = "\nQUESTION:\n"
with open(prompt_path, "r") as f:
data = f.readlines()
data = "".join(data)
_input += data
if starter_path != None:
with open(starter_path, "r") as f:
data = f.readlines()
data = "".join(data)
data = "\n" + data
_input += data
if os.path.exists(test_case_path):
with open(test_case_path, "r") as f:
data = json.load(f)
if not data.get("fn_name"):
_input += "\nUse Standard Input format"
else:
_input += "\nUse Call-Based format"
elif starter_path is not None and os.path.exists(starter_path):
_input += "\nUse Call-Based format"
else:
_input += "\nUse Standard Input format"
_input += "\nANSWER:\n"
return _input
def generate_critic_inputs(args, test_case_path, prompt_path, solutions_path, tokenizer,
starter_path=None, gt_solutions=False):
_input = generate_prompt(args, test_case_path, prompt_path, solutions_path, tokenizer, starter_path)
q_tokens = tokenizer.encode(_input, verbose=False, max_length=args.source_len)
in_tokens = [tokenizer.eos_token_id] * args.source_len
in_tokens[:len(q_tokens)] = q_tokens
in_tokens = in_tokens[:args.source_len]
solutions = json.load(open(solutions_path, 'r'))
all_texts = []
gt_errors = []
all_codes = []
for sol_index, solution in enumerate(solutions):
if gt_solutions:
solution_str = dsutils.reindent_code(solution)
else:
solution_str = dsutils.reindent_code(solution['code'])
a_tokens = tokenizer.encode(solution_str)
code = [-100] * args.max_len
code[:len(a_tokens)] = a_tokens
code = code[:args.max_len]
all_texts.append(in_tokens)
all_codes.append(code)
if gt_solutions:
gt_errors.append(dsutils.get_error_type(True, binary=args.binary_prediction))
else:
gt_errors.append(dsutils.get_error_type(solution['result'], binary=args.binary_prediction))
return all_texts, all_codes, gt_errors
def main(args):
argsdict = vars(args)
print(pprint.pformat(argsdict))
original_problems = glob.glob(args.test_path + '/*')
problems = sorted(original_problems)
if not os.path.exists(args.output_path):
os.makedirs(args.output_path, exist_ok=True)
print("Saving results to {}".format(args.output_path))
if args.start > len(problems) or args.start < 0:
print(f"start index {args.start} > number of problems {len(problems)}")
return
start = args.start
if args.end is None or args.end > len(problems):
end = len(problems)
else:
end = args.end
problems = problems[start:end]
# Set up model
tokenizer = RobertaTokenizer.from_pretrained('Salesforce/codet5-base', cache_dir=args.tokenizer_path)
print("Loading model from {}...".format(args.model_path))
if args.critic_scores:
model = T5ForConditionalGeneration.from_pretrained(args.model_path, tuning_mode='critic')
else:
model = T5ForConditionalGeneration.from_pretrained(args.model_path)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
if args.critic_scores:
all_preds = []
all_gts = []
# main eval loop
for index, problem in tqdm(enumerate(problems), ncols=0, total=len(problems)):
prob_path = os.path.join(problem)
print(f"problem path = {prob_path}")
problem_id = int(problem.split('/')[-1])
'''
if args.critic_scores and \
os.path.exists(os.path.join(args.output_path, f"{problem_id}_gt{args.gt_solutions}.pkl")):
continue
elif os.path.exists(os.path.join(args.output_path, f"{problem_id}.json")):
continue
'''
test_case_path = os.path.join(prob_path, "input_output.json")
prompt_path = os.path.join(prob_path, "question.txt")
starter_path = os.path.join(prob_path, "starter_code.py")
if args.critic_scores and not args.gt_solutions:
solutions_path = os.path.join(prob_path, "gen_solutions.json")
else:
solutions_path = os.path.join(prob_path, "solutions.json")
if not os.path.exists(starter_path):
starter_path = None
if args.critic_scores:
input_texts, input_codes, gt_error_types = generate_critic_inputs(args, test_case_path, prompt_path, solutions_path,
tokenizer, starter_path, args.gt_solutions)
else:
input_text = generate_prompt(args, test_case_path, prompt_path, solutions_path,
tokenizer, starter_path)
with torch.no_grad():
if args.critic_scores:
text_tensor = torch.tensor(input_texts).to(device)
code_tensor = torch.tensor(input_codes).to(device)
gt_error_tensor = torch.tensor(gt_error_types).to(device)
curr_inputs = {'input_ids': text_tensor, 'error_types': gt_error_tensor, 'labels': code_tensor}
_, error_preds, error_hidden_states = model(**curr_inputs, return_error_hidden_states=True)
assert len(gt_error_types) == len(error_preds)
all_preds.extend(error_preds.cpu().numpy().tolist())
all_gts.extend(gt_error_types)
saved_critic_scores = {'code': input_codes, 'prompt': input_texts,
'gt_error_type': gt_error_types,
'pred_error_type': error_preds.cpu().numpy(),
'error_hidden_states': error_hidden_states.cpu().numpy()}
if args.gt_solutions:
scores_loc = os.path.join(prob_path, "solutions_critic_scores.pkl")
else:
scores_loc = os.path.join(prob_path, "gen_solutions_critic_scores.pkl")
pkl.dump(saved_critic_scores, open(scores_loc, 'wb'))
else:
input_ids = torch.LongTensor(tokenizer.encode(input_text,
verbose=False,
max_length=args.source_len)).unsqueeze(0).cuda()
num_loops = int(args.num_seqs / args.num_seqs_per_iter)
output_programs = []
for i in tqdm(range(num_loops), ncols=0, total=num_loops, leave=False):
output_ids = model.generate(
input_ids,
do_sample=True,
temperature=args.temperature,
max_length=args.max_len,
num_return_sequences=args.num_seqs_per_iter,
top_p=0.95)
for output_id in output_ids:
output_programs.append(tokenizer.decode(output_id, skip_special_tokens=True))
saved_codes = {}
saved_codes[problem_id] = {'code': output_programs, 'prompt': input_text}
codes_loc = os.path.join(args.output_path, f"{problem_id}.json")
with open(codes_loc, "w") as f:
json.dump(saved_codes, f)
if args.critic_scores:
print("Total number of samples: {}".format(len(all_gts)))
acc = (np.array(all_preds) == np.array(all_gts)).sum()/len(all_gts)
print("Error Pred Acc: {}".format(acc))
print("Prediction distribution: {}".format(Counter(all_preds)))
print("GT distribution: {}".format(Counter(all_gts)))
if __name__ == "__main__":
from configs.generate_configs import *
main(args)