-
Notifications
You must be signed in to change notification settings - Fork 0
/
csp.py
510 lines (419 loc) · 19.5 KB
/
csp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
"""CSP (Constraint Satisfaction Problems) problems and solvers. (Chapter 6)"""
import random
from collections import defaultdict, Counter
from operator import eq, neg
from sortedcontainers import SortedSet
import search
from utils import argmin_random_tie, count, first, extend
class CSP(search.Problem):
"""This class describes finite-domain Constraint Satisfaction Problems.
A CSP is specified by the following inputs:
variables A list of variables; each is atomic (e.g. int or string).
domains A dict of {var:[possible_value, ...]} entries.
neighbors A dict of {var:[var,...]} that for each variable lists
the other variables that participate in constraints.
constraints A function f(A, a, B, b) that returns true if neighbors
A, B satisfy the constraint when they have values A=a, B=b
In the textbook and in most mathematical definitions, the
constraints are specified as explicit pairs of allowable values,
but the formulation here is easier to express and more compact for
most cases (for example, the n-Queens problem can be represented
in O(n) space using this notation, instead of O(n^4) for the
explicit representation). In terms of describing the CSP as a
problem, that's all there is.
However, the class also supports data structures and methods that help you
solve CSPs by calling a search function on the CSP. Methods and slots are
as follows, where the argument 'a' represents an assignment, which is a
dict of {var:val} entries:
assign(var, val, a) Assign a[var] = val; do other bookkeeping
unassign(var, a) Do del a[var], plus other bookkeeping
nconflicts(var, val, a) Return the number of other variables that
conflict with var=val
curr_domains[var] Slot: remaining consistent values for var
Used by constraint propagation routines.
The following methods are used only by graph_search and tree_search:
actions(state) Return a list of actions
result(state, action) Return a successor of state
goal_test(state) Return true if all constraints satisfied
The following are just for debugging purposes:
nassigns Slot: tracks the number of assignments made
display(a) Print a human-readable representation
"""
def __init__(self, variables, domains, neighbors, constraints):
"""Construct a CSP problem. If variables is empty, it becomes domains.keys()."""
super().__init__(())
variables = variables or list(domains.keys())
self.variables = variables
self.domains = domains
self.neighbors = neighbors
self.constraints = constraints
self.curr_domains = None
self.nassigns = 0
# ADDED CODE HERE
self.total_checks = 0 # number of checks, for result metrics
self.weight = dict() # weight counter for dom/wdeg ordering
for var in self.variables:
for nb in self.neighbors[var]:
self.weight[(var, nb)] = 1 # a dictionary with constraint (A,B): weight
def assign(self, var, val, assignment):
"""Add {var: val} to assignment; Discard the old value if any."""
assignment[var] = val
self.nassigns += 1
def unassign(self, var, assignment):
"""Remove {var: val} from assignment.
DO NOT call this if you are changing a variable to a new value;
just call assign for that."""
if var in assignment:
del assignment[var]
def nconflicts(self, var, val, assignment):
"""Return the number of conflicts var=val has with other variables."""
# Subclasses may implement this more efficiently
def conflict(var2):
return var2 in assignment and not self.constraints(var, val, var2, assignment[var2])
return count(conflict(v) for v in self.neighbors[var])
def display(self, assignment):
"""Show a human-readable representation of the CSP."""
# Subclasses can print in a prettier way, or display with a GUI
print(assignment)
# These methods are for the tree and graph-search interface:
def actions(self, state):
"""Return a list of applicable actions: non conflicting
assignments to an unassigned variable."""
if len(state) == len(self.variables):
return []
else:
assignment = dict(state)
var = first([v for v in self.variables if v not in assignment])
return [(var, val) for val in self.domains[var]
if self.nconflicts(var, val, assignment) == 0]
def result(self, state, action):
"""Perform an action and return the new state."""
(var, val) = action
return state + ((var, val),)
def goal_test(self, state):
"""The goal is to assign all variables, with all constraints satisfied."""
assignment = dict(state)
return (len(assignment) == len(self.variables)
and all(self.nconflicts(variables, assignment[variables], assignment) == 0
for variables in self.variables))
# These are for constraint propagation
def support_pruning(self):
"""Make sure we can prune values from domains. (We want to pay
for this only if we use it.)"""
if self.curr_domains is None:
self.curr_domains = {v: list(self.domains[v]) for v in self.variables}
def suppose(self, var, value):
"""Start accumulating inferences from assuming var=value."""
self.support_pruning()
removals = [(var, a) for a in self.curr_domains[var] if a != value]
self.curr_domains[var] = [value]
return removals
def prune(self, var, value, removals):
"""Rule out var=value."""
self.curr_domains[var].remove(value)
if removals is not None:
removals.append((var, value))
def choices(self, var):
"""Return all values for var that aren't currently ruled out."""
return (self.curr_domains or self.domains)[var]
def infer_assignment(self):
"""Return the partial assignment implied by the current inferences."""
self.support_pruning()
return {v: self.curr_domains[v][0]
for v in self.variables if 1 == len(self.curr_domains[v])}
def restore(self, removals):
"""Undo a supposition and all inferences from it."""
for B, b in removals:
self.curr_domains[B].append(b)
# This is for min_conflicts search
def conflicted_vars(self, current):
"""Return a list of variables in current assignment that are in conflict"""
return [var for var in self.variables
if self.nconflicts(var, current[var], current) > 0]
# ______________________________________________________________________________
# Constraint Propagation with AC3
def no_arc_heuristic(csp, queue):
return queue
def dom_j_up(csp, queue):
return SortedSet(queue, key=lambda t: neg(len(csp.curr_domains[t[1]])))
def AC3(csp, queue=None, removals=None, arc_heuristic=dom_j_up):
"""[Figure 6.3]"""
if queue is None:
queue = {(Xi, Xk) for Xi in csp.variables for Xk in csp.neighbors[Xi]}
csp.support_pruning()
queue = arc_heuristic(csp, queue)
checks = 0
while queue:
(Xi, Xj) = queue.pop()
revised, checks = revise(csp, Xi, Xj, removals, checks)
if revised:
if not csp.curr_domains[Xi]:
return False, checks # CSP is inconsistent
for Xk in csp.neighbors[Xi]:
if Xk != Xj:
queue.add((Xk, Xi))
return True, checks # CSP is satisfiable
def revise(csp, Xi, Xj, removals, checks=0):
"""Return true if we remove a value."""
revised = False
for x in csp.curr_domains[Xi][:]:
# If Xi=x conflicts with Xj=y for every possible y, eliminate Xi=x
# if all(not csp.constraints(Xi, x, Xj, y) for y in csp.curr_domains[Xj]):
conflict = True
for y in csp.curr_domains[Xj]:
if csp.constraints(Xi, x, Xj, y):
conflict = False
checks += 1
if not conflict:
break
if conflict:
csp.prune(Xi, x, removals)
revised = True
# ADDED CODE HERE
if not csp.curr_domains[Xi]:
# domain wipe out occurs, increment of weight
csp.weight[(Xi, Xj)] += 1
return revised, checks
# Constraint Propagation with AC3b: an improved version
# of AC3 with double-support domain-heuristic
def AC3b(csp, queue=None, removals=None, arc_heuristic=dom_j_up):
if queue is None:
queue = {(Xi, Xk) for Xi in csp.variables for Xk in csp.neighbors[Xi]}
csp.support_pruning()
queue = arc_heuristic(csp, queue)
checks = 0
while queue:
(Xi, Xj) = queue.pop()
# Si_p values are all known to be supported by Xj
# Sj_p values are all known to be supported by Xi
# Dj - Sj_p = Sj_u values are unknown, as yet, to be supported by Xi
Si_p, Sj_p, Sj_u, checks = partition(csp, Xi, Xj, checks)
if not Si_p:
return False, checks # CSP is inconsistent
revised = False
for x in set(csp.curr_domains[Xi]) - Si_p:
csp.prune(Xi, x, removals)
revised = True
if revised:
for Xk in csp.neighbors[Xi]:
if Xk != Xj:
queue.add((Xk, Xi))
if (Xj, Xi) in queue:
if isinstance(queue, set):
# or queue -= {(Xj, Xi)} or queue.remove((Xj, Xi))
queue.difference_update({(Xj, Xi)})
else:
queue.difference_update((Xj, Xi))
# the elements in D_j which are supported by Xi are given by the union of Sj_p with the set of those
# elements of Sj_u which further processing will show to be supported by some vi_p in Si_p
for vj_p in Sj_u:
for vi_p in Si_p:
conflict = True
if csp.constraints(Xj, vj_p, Xi, vi_p):
conflict = False
Sj_p.add(vj_p)
checks += 1
if not conflict:
break
revised = False
for x in set(csp.curr_domains[Xj]) - Sj_p:
csp.prune(Xj, x, removals)
revised = True
if revised:
for Xk in csp.neighbors[Xj]:
if Xk != Xi:
queue.add((Xk, Xj))
return True, checks # CSP is satisfiable
def partition(csp, Xi, Xj, checks=0):
Si_p = set()
Sj_p = set()
Sj_u = set(csp.curr_domains[Xj])
for vi_u in csp.curr_domains[Xi]:
conflict = True
# now, in order to establish support for a value vi_u in Di it seems better to try to find a support among
# the values in Sj_u first, because for each vj_u in Sj_u the check (vi_u, vj_u) is a double-support check
# and it is just as likely that any vj_u in Sj_u supports vi_u than it is that any vj_p in Sj_p does...
for vj_u in Sj_u - Sj_p:
# double-support check
if csp.constraints(Xi, vi_u, Xj, vj_u):
conflict = False
Si_p.add(vi_u)
Sj_p.add(vj_u)
checks += 1
if not conflict:
break
# ... and only if no support can be found among the elements in Sj_u, should the elements vj_p in Sj_p be used
# for single-support checks (vi_u, vj_p)
if conflict:
for vj_p in Sj_p:
# single-support check
if csp.constraints(Xi, vi_u, Xj, vj_p):
conflict = False
Si_p.add(vi_u)
checks += 1
if not conflict:
break
return Si_p, Sj_p, Sj_u - Sj_p, checks
# Constraint Propagation with AC4
def AC4(csp, queue=None, removals=None, arc_heuristic=dom_j_up):
if queue is None:
queue = {(Xi, Xk) for Xi in csp.variables for Xk in csp.neighbors[Xi]}
csp.support_pruning()
queue = arc_heuristic(csp, queue)
support_counter = Counter()
variable_value_pairs_supported = defaultdict(set)
unsupported_variable_value_pairs = []
checks = 0
# construction and initialization of support sets
while queue:
(Xi, Xj) = queue.pop()
revised = False
for x in csp.curr_domains[Xi][:]:
for y in csp.curr_domains[Xj]:
if csp.constraints(Xi, x, Xj, y):
support_counter[(Xi, x, Xj)] += 1
variable_value_pairs_supported[(Xj, y)].add((Xi, x))
checks += 1
if support_counter[(Xi, x, Xj)] == 0:
csp.prune(Xi, x, removals)
revised = True
unsupported_variable_value_pairs.append((Xi, x))
if revised:
if not csp.curr_domains[Xi]:
return False, checks # CSP is inconsistent
# propagation of removed values
while unsupported_variable_value_pairs:
Xj, y = unsupported_variable_value_pairs.pop()
for Xi, x in variable_value_pairs_supported[(Xj, y)]:
revised = False
if x in csp.curr_domains[Xi][:]:
support_counter[(Xi, x, Xj)] -= 1
if support_counter[(Xi, x, Xj)] == 0:
csp.prune(Xi, x, removals)
revised = True
unsupported_variable_value_pairs.append((Xi, x))
if revised:
if not csp.curr_domains[Xi]:
return False, checks # CSP is inconsistent
return True, checks # CSP is satisfiable
# ______________________________________________________________________________
# CSP Backtracking Search
# Variable ordering
def first_unassigned_variable(assignment, csp):
"""The default variable order."""
return first([var for var in csp.variables if var not in assignment])
def mrv(assignment, csp):
"""Minimum-remaining-values heuristic."""
return argmin_random_tie([v for v in csp.variables if v not in assignment],
key=lambda var: num_legal_values(csp, var, assignment))
def num_legal_values(csp, var, assignment):
if csp.curr_domains:
return len(csp.curr_domains[var])
else:
return count(csp.nconflicts(var, val, assignment) == 0 for val in csp.domains[var])
# ADDED CODE HERE: function dom/wdeg
def dom_wdeg(assignment, csp):
# in case there are no curr domains we must return another variable ordering or it will cause error of NoneType
if csp.curr_domains is None:
return first_unassigned_variable(assignment, csp)
min_var = None
min_ratio = float('inf')
sum = dict() # sum is a dictionary , with variables as keys
for var in csp.variables:
if var in assignment:
continue
sum[var] = 0 # sum of weights of constraint for variable var
for nb in csp.neighbors[var]:
if nb not in assignment: # an unassigned variable
sum[var] += csp.weight[(var, nb)]
if sum[var] == 0:
continue
curr_domain_size = len(csp.curr_domains[var]) # for variable take the current domain size
curr_ratio = curr_domain_size / sum[var]
if min_ratio > curr_ratio:
# select the variable with the smallest ratio current domain size to current weighted degree
min_ratio = curr_ratio
min_var = var
if min_var:
return min_var
else:
return first_unassigned_variable(assignment, csp)
# Value ordering
def unordered_domain_values(var, assignment, csp):
"""The default value order."""
return csp.choices(var)
def lcv(var, assignment, csp):
"""Least-constraining-values heuristic."""
return sorted(csp.choices(var), key=lambda val: csp.nconflicts(var, val, assignment))
# Inference
def no_inference(csp, var, value, assignment, removals):
return True, 0 # return 0 checks , or it will cause error
def forward_checking(csp, var, value, assignment, removals, select_unassigned_variable=mrv):
"""Prune neighbor values inconsistent with var=value."""
csp.support_pruning()
checks = 0
for B in csp.neighbors[var]:
if B not in assignment:
for b in csp.curr_domains[B][:]:
checks += 1
if not csp.constraints(var, value, B, b):
csp.prune(B, b, removals)
# ADDED CODE HERE
# because revise function is used only by MAC, we need to change FC to work with dom/wdeg
# also return number of checks
if not csp.curr_domains[B]:
# domain wipe out occurs for (B, var), increment of weight
csp.weight[(B, var)] += 1
return False, checks
return True, checks
def mac(csp, var, value, assignment, removals, constraint_propagation=AC3):
"""Maintain arc consistency."""
return constraint_propagation(csp, {(X, var) for X in csp.neighbors[var]}, removals)
# The search, proper
def backtracking_search(csp, select_unassigned_variable=first_unassigned_variable,
order_domain_values=unordered_domain_values, inference=no_inference):
"""[Figure 6.5]"""
def backtrack(assignment):
if len(assignment) == len(csp.variables):
return assignment
var = select_unassigned_variable(assignment, csp)
for value in order_domain_values(var, assignment, csp):
if 0 == csp.nconflicts(var, value, assignment):
csp.assign(var, value, assignment)
removals = csp.suppose(var, value)
# MODIFIED CODE HERE to get the number of checks returned by each inference function
res, check = inference(csp, var, value, assignment, removals)
csp.total_checks += check
if res:
result = backtrack(assignment)
if result is not None:
return result
csp.restore(removals)
csp.unassign(var, assignment)
return None
result = backtrack({})
assert result is None or csp.goal_test(result)
# ADDED CODE HERE, return number of assigns and total checks too
return result, csp.nassigns, csp.total_checks
# ______________________________________________________________________________
# Min-conflicts Hill Climbing search for CSPs
def min_conflicts(csp, max_steps=100000):
"""Solve a CSP by stochastic Hill Climbing on the number of conflicts."""
# Generate a complete assignment for all variables (probably with conflicts)
csp.current = current = {}
for var in csp.variables:
val = min_conflicts_value(csp, var, current)
csp.assign(var, val, current)
# Now repeatedly choose a random conflicted variable and change it
for i in range(max_steps):
conflicted = csp.conflicted_vars(current)
if not conflicted:
return current, csp.nassigns # return number of assigns too
var = random.choice(conflicted)
val = min_conflicts_value(csp, var, current)
csp.assign(var, val, current)
return None, csp.nassigns # return number of assigns too
def min_conflicts_value(csp, var, current):
"""Return the value that will give var the least number of conflicts.
If there is a tie, choose at random."""
return argmin_random_tie(csp.domains[var], key=lambda val: csp.nconflicts(var, val, current))