1 from squander.gates.gates_Wrapper
import CNOT, CH, CZ, CRY
3 from squander
import utils
4 from itertools
import dropwhile
5 from typing
import List, Tuple
13 return ({gate.get_Target_Qbit(), gate.get_Control_Qbit()}
14 if isinstance(gate, (CH, CRY, CNOT, CZ))
else {gate.get_Target_Qbit()})
26 top_circuit = Circuit(c.get_Qbit_Num())
29 gate_dict = {i: gate
for i, gate
in enumerate(c.get_Gates())}
30 g, rg = { i: set()
for i
in gate_dict }, { i: set()
for i
in gate_dict }
32 for gate
in gate_dict:
33 for child
in c.get_Children(gate_dict[gate]):
37 L, S = [], {m
for m
in rg
if len(rg[m]) == 0}
40 def partition_condition(gate):
41 return len(
get_qubits(gate_dict[gate]) | curr_partition) > max_qubit
43 c = Circuit(c.get_Qbit_Num())
44 curr_partition = set()
51 n = next(dropwhile(partition_condition, S),
None)
52 elif isinstance(next(iter(preparts[0])), tuple):
53 Scomp = {(frozenset(
get_qubits(gate_dict[x])), gate_dict[x].get_Name()): x
for x
in S}
54 n = next(iter(Scomp.keys() & preparts[len(parts)-1]),
None)
55 if n
is not None: n = Scomp[n]
57 n = next(iter(S & preparts[len(parts)-1]),
None)
58 assert (n
is None) == (len(preparts[len(parts)-1]) == len(parts[-1]))
62 top_circuit.add_Circuit(c)
63 total += len(c.get_Gates())
65 curr_partition = set()
66 c = Circuit(c.get_Qbit_Num())
68 if preparts
is None: n = next(iter(S))
69 elif isinstance(next(iter(preparts[0])), tuple):
70 Scomp = {(frozenset(
get_qubits(gate_dict[x])), gate_dict[x].get_Name()): x
for x
in S}
71 n = next(iter(Scomp.keys() & preparts[len(parts)-1]),
None)
72 if n
is not None: n = Scomp[n]
73 else: n = next(iter(S & preparts[len(parts)-1]))
79 c.add_Gate(gate_dict[n])
90 assert len(rg[n]) == 0
91 for child
in set(g[n]):
98 top_circuit.add_Circuit(c)
99 total += len(c.get_Gates())
100 assert total == len(gate_dict)
102 return top_circuit, param_order, parts
112 gatedict = {i: gate
for i, gate
in enumerate(c.get_Gates())}
113 all_qubits = set(range(c.get_Qbit_Num()))
115 num_gates = len(gatedict)
116 prob = pulp.LpProblem(
"MaxSinglePartition", pulp.LpMinimize)
117 a = pulp.LpVariable.dicts(
"a", (i
for i
in range(num_gates)), cat=
"Binary")
118 x = pulp.LpVariable.dicts(
"x", (i
for i
in range(num_gates)), cat=
"Binary")
119 z = pulp.LpVariable.dicts(
"z", (q
for q
in all_qubits), cat=
"Binary")
122 prob += x[i] + a[i] <= 1
126 for q
in gate_qubits:
129 prob += pulp.lpSum(z[q]
for q
in all_qubits) <= max_qubits_per_partition
131 for part
in prevparts:
134 prob += a[i] == a[next(iter(part))]
137 for i
in c.get_Children(gatedict[j]):
139 prob += x[j] + a[j] >= x[i]
140 prob.setObjective(-pulp.lpSum(x[i]
for i
in range(num_gates)))
143 prob.solve(pulp.GUROBI(manageEnv=
True, msg=
False, timeLimit=180, Threads=os.cpu_count()))
145 gates = {i
for i
in range(num_gates)
if int(pulp.value(x[i]))}
146 qubits = set.union(*(
get_qubits(gatedict[i])
for i
in gates))
159 gatedict = {i: gate
for i, gate
in enumerate(c.get_Gates())}
160 num_gates = len(gatedict)
162 while sum(len(x)
for x
in parts) != num_gates:
164 gatedict = {i: gate
for i, gate
in enumerate(c.get_Gates())}
165 partdict = {gate: i
for i, part
in enumerate(parts)
for gate
in part}
166 g, rg = {i: set()
for i
in range(len(parts))}, {i: set()
for i
in range(len(parts))}
167 for gate
in gatedict:
168 for child
in c.get_Children(gatedict[gate]):
169 if partdict[gate] == partdict[child]:
continue 170 g[partdict[gate]].add(partdict[child])
171 rg[partdict[child]].add(partdict[gate])
172 L, S = [], {m
for m
in rg
if len(rg[m]) == 0}
176 assert len(rg[n]) == 0
182 assert len(L) == len(parts)
189 Call to reorder circuit parameters based on partitioned execution order 193 params ( np.ndarray ) Original parameter array 195 param_order (List[int] ) Tuples specifying new parameter positions: source_idx, dest_idx, param_count 199 Returns with the reordered Reordered parameter array 202 reordered = np.empty_like(params)
204 for s_idx, n_idx, n_params
in param_order:
205 reordered[n_idx:n_idx + n_params] = params[s_idx:s_idx + n_params]
211 def PartitionCircuit( circ: Circuit, parameters: np.ndarray, max_partition_size: int, use_ilp : bool =
False) -> tuple[Circuit, np.ndarray]:
213 Call to partition a circuit 217 circ ( Circuit ) A circuit to be partitioned 219 parameters ( np.ndarray ) A parameter array associated with the input circuit 221 max_partition_size (int) : The maximal number of qubits in the partitions 223 use_ilp (bool, optional) Set True to use ILP partitioning startegy (slow, but giving optimal result), or False (default) to use Kahn topological sorting 227 Returns with the paritioned circuit and the associated parameter array. Partitions are organized into subcircuits of the resulting circuit 234 partitioned_circ, param_order, _ =
kahn_partition(circ, max_partition_size)
239 return partitioned_circ, param_reordered
242 def PartitionCircuitQasm(filename: str, max_partition_size: int, use_ilp : bool =
False) -> tuple[Circuit, np.ndarray]:
244 Call to partition a circuit loaded from a qasm file 248 filename ( str ) A path to a qasm file to be imported 250 max_partition_size (int) : The maximal number of qubits in the partitions 252 use_ilp (bool, optional) Set True to use ILP partitioning startegy (slow, but giving optimal result), or False (default) otherwise 256 Returns with the paritioned circuit and the associated parameter array. Partitions are organized into subcircuits of the resulting circuit 259 circ, parameters = utils.qasm_to_squander_circuit(filename)
263 if __name__ ==
"__main__":
def get_Parameter_Start_Index(self)
def translate_param_order
def find_next_biggest_partition(c, max_qubits_per_partition, prevparts=None)
def ilp_max_partitions(c, max_qubits_per_partition)
def get_Parameter_Num(self)
Call to get the number of free parameters in the gate structure used for the decomposition.
def kahn_partition(c, max_qubit, preparts=None)