Sequential Quantum Gate Decomposer  v1.9.3
Powerful decomposition of general unitarias into one- and two-qubit gates gates
qgd_Wide_Circuit_Optimization.py
Go to the documentation of this file.
1 """
2 Implementation to optimize wide circuits (i.e. circuits with many qubits) by partitioning the circuit into smaller partitions and redecompose the smaller partitions
3 """
4 
5 from squander.decomposition.qgd_N_Qubit_Decomposition_Tree_Search import qgd_N_Qubit_Decomposition_Tree_Search as N_Qubit_Decomposition_Tree_Search
6 from squander.decomposition.qgd_N_Qubit_Decomposition_Tabu_Search import qgd_N_Qubit_Decomposition_Tabu_Search as N_Qubit_Decomposition_Tabu_Search
7 from squander.gates.qgd_Circuit import qgd_Circuit as Circuit
8 from squander.utils import CompareCircuits
9 
10 import numpy as np
11 from qiskit import QuantumCircuit
12 
13 from typing import List, Callable
14 
15 import multiprocessing as mp
16 from multiprocessing import Process, Pool
17 import os
18 
19 
20 from squander.partitioning.partition import (
21  get_qubits,
22  PartitionCircuit
23 )
24 
25 
26 
27 
28 
29 
30 def CNOTGateCount( circ: Circuit ) -> int :
31  """
32  Call to get the number of CNOT gates in the circuit
33 
34 
35  Args:
36 
37  circ (Circuit) A squander circuit representation
38 
39 
40  Return:
41 
42  Returns with the CNOT gate count
43 
44 
45  """
46 
47  if not isinstance(circ, Circuit ):
48  raise Exception("The input parameters should be an instance of Squander Circuit")
49 
50  gate_counts = circ.get_Gate_Nums()
51 
52  return gate_counts.get('CNOT', 0)
53 
54 
55 
56 
57 
58 
59 
60 
62  """
63  Class implementing the optimization of wide circuits (i.e. circuits with many qubits) by
64  partitioning the circuit into smaller partitions and redecompose the smaller partitions
65 
66  """
67 
68  def __init__( self, config ):
69 
70  config.setdefault('strategy', 'TreeSearch')
71  config.setdefault('parallel', 0 )
72  config.setdefault('verbosity', 0 )
73  config.setdefault('tolerance', 1e-8 )
74  config.setdefault('test_subcircuits', False )
75  config.setdefault('test_final_circuit', True )
76  config.setdefault('max_partition_size', 3 )
77 
78  #testing the fields of config
79  strategy = config[ 'strategy' ]
80  allowed_startegies = ['TreeSearch', 'TabuSearch' ]
81  if not strategy in allowed_startegies :
82  raise Exception(f"The decomposition startegy should be either of {allowed_startegies}, got {startegy}.")
83 
84 
85  parallel = config[ 'parallel' ]
86  allowed_parallel = [0, 1, 2 ]
87  if not parallel in allowed_parallel :
88  raise Exception(f"The parallel configuration should be either of {allowed_parallel}, got {parallel}.")
89 
90 
91  verbosity = config[ 'verbosity' ]
92  if not isinstance( verbosity, int) :
93  raise Exception(f"The verbosity parameter should be an integer.")
94 
95 
96  tolerance = config[ 'tolerance' ]
97  if not isinstance( tolerance, float) :
98  raise Exception(f"The tolerance parameter should be a float.")
99 
100 
101  test_subcircuits = config[ 'test_subcircuits' ]
102  if not isinstance( test_subcircuits, bool) :
103  raise Exception(f"The test_subcircuits parameter should be a bool.")
104 
105 
106  test_final_circuit = config[ 'test_final_circuit' ]
107  if not isinstance( test_final_circuit, bool) :
108  raise Exception(f"The test_final_circuit parameter should be a bool.")
109 
110 
111 
112  max_partition_size = config[ 'max_partition_size' ]
113  if not isinstance( max_partition_size, int) :
114  raise Exception(f"The max_partition_size parameter should be an integer.")
115 
116  self.config = config
117 
118 
119  self.max_partition_size = max_partition_size
120 
121 
122 
123  def ConstructCircuitFromPartitions( self, circs: List[Circuit], parameter_arrs: [List[np.ndarray]] ) -> (Circuit, np.ndarray):
124  """
125  Call to construct the wide quantum circuit from the partitions.
126 
127 
128  Args:
129 
130  circs ( List[Circuit] ) A list of Squander circuits to be compared
131 
132  parameter_arrs ( List[np.ndarray] ) A list of parameter arrays associated with the sqaunder circuits
133 
134  Return:
135 
136  Returns with the constructed circuit and the corresponding parameter array
137 
138 
139  """
140 
141  if not isinstance( circs, list ):
142  raise Exception("First argument should be a list of squander circuits")
143 
144  if not isinstance( parameter_arrs, list ):
145  raise Exception("Second argument should be a list of numpy arrays")
146 
147  if len(circs) != len(parameter_arrs) :
148  raise Exception("The first two arguments should be of the same length")
149 
150 
151  qbit_num = circs[0].get_Qbit_Num()
152 
153 
154 
155  wide_parameters = np.concatenate( parameter_arrs, axis=0 )
156 
157 
158  wide_circuit = Circuit( qbit_num )
159 
160  for circ in circs:
161  wide_circuit.add_Circuit( circ )
162 
163 
164  assert wide_circuit.get_Parameter_Num() == wide_parameters.size, \
165  f"Mismatch in the number of parameters: {wide_circuit.get_Parameter_Num()} vs {wide_parameters.size}"
166 
167 
168 
169  return wide_circuit, wide_parameters
170 
171 
172  @staticmethod
173  def DecomposePartition( Umtx: np.ndarray, config: dict ) -> Circuit:
174  """
175  Call to run the decomposition of a given unitary Umtx, typically associated with the circuit
176  partition to be optimized
177 
178 
179  Args:
180 
181  Umtx (np.ndarray) A complex typed unitary to be decomposed
182 
183 
184  Return:
185 
186  Returns with the the decoposed circuit structure and with the corresponding gate parameters
187 
188 
189  """
190  strategy = config["strategy"]
191  if strategy == "TreeSearch":
192  cDecompose = N_Qubit_Decomposition_Tree_Search( Umtx.conj().T, config=config, accelerator_num=0 )
193  elif strategy == "TabuSearch":
194  cDecompose = N_Qubit_Decomposition_Tabu_Search( Umtx.conj().T, config=config, accelerator_num=0 )
195  else:
196  raise Exception(f"Unsupported decomposition type: {strategy}")
197 
198 
199  tolerance = config["tolerance"]
200 
201 
202  cDecompose.set_Verbose( config["verbosity"] )
203  cDecompose.set_Cost_Function_Variant( 3 )
204  cDecompose.set_Optimization_Tolerance( tolerance )
205 
206 
207  # adding new layer to the decomposition until threshold
208  cDecompose.set_Optimizer( "BFGS" )
209 
210  # starting the decomposition
211  cDecompose.Start_Decomposition()
212 
213 
214  squander_circuit = cDecompose.get_Circuit()
215  parameters = cDecompose.get_Optimized_Parameters()
216 
217 
218  print( "Decomposition error: ", cDecompose.get_Decomposition_Error() )
219 
220  if tolerance < cDecompose.get_Decomposition_Error():
221  return None, None
222 
223 
224  return squander_circuit, parameters
225 
226 
227 
228  @staticmethod
229  def CompareAndPickCircuits( circs: List[Circuit], parameter_arrs: [List[np.ndarray]], metric : Callable[ [Circuit], int ] = CNOTGateCount ) -> Circuit:
230  """
231  Call to pick the most optimal circuit corresponding a specific metric. Looks for the circuit
232  with the minimal metric value.
233 
234 
235  Args:
236 
237  circs ( List[Circuit] ) A list of Squander circuits to be compared
238 
239  parameter_arrs ( List[np.ndarray] ) A list of parameter arrays associated with the sqaunder circuits
240 
241  metric (optional) The metric function to decide which input circuit is better.
242 
243 
244  Return:
245 
246  Returns with the chosen circuit and the corresponding parameter array
247 
248 
249  """
250 
251  if not isinstance( circs, list ):
252  raise Exception("First argument should be a list of squander circuits")
253 
254  if not isinstance( parameter_arrs, list ):
255  raise Exception("Second argument should be a list of numpy arrays")
256 
257  if len(circs) != len(parameter_arrs) :
258  raise Exception("The first two arguments should be of the same length")
259 
260 
261  metrics = [metric( circ ) for circ in circs]
262 
263  metrics = np.array( metrics )
264 
265  min_idx = np.argmin( metrics )
266 
267  return circs[ min_idx ], parameter_arrs[ min_idx ]
268 
269 
270 
271  @staticmethod
272  def PartitionDecompositionProcess( subcircuit: Circuit, subcircuit_parameters: np.ndarray, config: dict ) -> (Circuit, np.ndarray):
273  """
274  Implements an asynchronous process to decompose a unitary associated with a partition in a large
275  quantum circuit
276 
277 
278  Args:
279 
280  circ ( Circuit ) A subcircuit representing a partition
281 
282  parameters ( np.ndarray ) A parameter array associated with the input circuit
283 
284 
285  """
286 
287 
288  qbit_num_orig_circuit = subcircuit.get_Qbit_Num()
289  involved_qbits = subcircuit.get_Qbits()
290  qbit_num = len( involved_qbits )
291 
292  # create qbit map:
293  qbit_map = {}
294  for idx in range( len(involved_qbits) ):
295  qbit_map[ involved_qbits[idx] ] = idx
296 
297  # remap the subcircuit to a smaller qubit register
298  remapped_subcircuit = subcircuit.Remap_Qbits( qbit_map, qbit_num )
299 
300  # get the unitary representing the circuit
301  unitary = remapped_subcircuit.get_Matrix( subcircuit_parameters )
302 
303  # decompose a small unitary into a new circuit
304  decomposed_circuit, decomposed_parameters = qgd_Wide_Circuit_Optimization.DecomposePartition( unitary, config )
305 
306  if decomposed_circuit is None:
307  decomposed_circuit = subcircuit
308  decomposed_parameters = subcircuit_parameters
309 
310 
311  # create inverse qbit map:
312  inverse_qbit_map = {}
313  for key, value in qbit_map.items():
314  inverse_qbit_map[ value ] = key
315 
316  # remap the decomposed circuit in order to insert it into a large circuit
317  new_subcircuit = decomposed_circuit.Remap_Qbits( inverse_qbit_map, qbit_num_orig_circuit )
318 
319 
320  if config["test_subcircuits"]:
321  CompareCircuits( subcircuit, subcircuit_parameters, new_subcircuit, decomposed_parameters, parallel=config["parallel"] )
322 
323 
324 
325  new_subcircuit = new_subcircuit.get_Flat_Circuit()
326 
327  return new_subcircuit, decomposed_parameters
328 
329 
330 
331 
332  def OptimizeWideCircuit( self, circ: Circuit, parameters: np.ndarray ) -> (Circuit, np.ndarray):
333  """
334  Call to optimize a wide circuit (i.e. circuits with many qubits) by
335  partitioning the circuit into smaller partitions and redecompose the smaller partitions
336 
337 
338  Args:
339 
340  circ ( Circuit ) A circuit to be partitioned
341 
342  parameters ( np.ndarray ) A parameter array associated with the input circuit
343 
344  Return:
345 
346  Returns with the optimized circuit and the corresponding parameter array
347 
348  """
349 
350  partitined_circuit, parameters = PartitionCircuit( circ, parameters, self.max_partition_size )
351 
352 
353  qbit_num_orig_circuit = partitined_circuit.get_Qbit_Num()
354 
355 
356  subcircuits = partitined_circuit.get_Gates()
357 
358 
359 
360 
361  # the list of optimized subcircuits
362  optimized_subcircuits = [None] * len(subcircuits)
363 
364  # the list of parameters associated with the optimized subcircuits
365  optimized_parameter_list = [None] * len(subcircuits)
366 
367  # list of AsyncResult objects
368  async_results = [None] * len(subcircuits)
369 
370  with Pool(processes=mp.cpu_count()) as pool:
371 
372  # code for iterate over partitions and optimize them
373  for partition_idx, subcircuit in enumerate( subcircuits ):
374 
375 
376  # isolate the parameters corresponding to the given sub-circuit
377  start_idx = subcircuit.get_Parameter_Start_Index()
378  end_idx = subcircuit.get_Parameter_Start_Index() + subcircuit.get_Parameter_Num()
379  subcircuit_parameters = parameters[ start_idx:end_idx ]
380 
381 
382 
383  # callback function done on the master process to compare the new decomposed and the original suncircuit
384  callback_fnc = lambda x : self.CompareAndPickCircuits( [subcircuit, x[0]], [subcircuit_parameters, x[1]] )
385 
386  # call a process to decompose a subcircuit
387  async_results[partition_idx] = pool.apply_async( self.PartitionDecompositionProcess, (subcircuit, subcircuit_parameters, self.config), callback=callback_fnc )
388 
389 
390 
391  # code for iterate over async results and retrieve the new subcircuits
392  for partition_idx, subcircuit in enumerate( subcircuits ):
393 
394  new_subcircuit, new_parameters = async_results[partition_idx].get( timeout = 1800 )
395 
396  '''
397  if subcircuit != new_subcircuit:
398 
399  print( "original subcircuit: ", subcircuit.get_Gate_Nums())
400  print( "reoptimized subcircuit: ", new_subcircuit.get_Gate_Nums())
401  '''
402 
403  optimized_subcircuits[ partition_idx ] = new_subcircuit
404  optimized_parameter_list[ partition_idx ] = new_parameters
405 
406 
407  # construct the wide circuit from the optimized suncircuits
408  wide_circuit, wide_parameters = self.ConstructCircuitFromPartitions( optimized_subcircuits, optimized_parameter_list )
409 
410 
411  print( "original circuit: ", partitined_circuit.get_Gate_Nums())
412  print( "reoptimized circuit: ", wide_circuit.get_Gate_Nums())
413 
414 
415 
416  if self.config["test_final_circuit"]:
417  CompareCircuits( partitined_circuit, parameters, wide_circuit, wide_parameters )
418 
419 
420  return wide_circuit, wide_parameters
421 
A base class to determine the decomposition of an N-qubit unitary into a sequence of CNOT and U3 gate...
A base class to determine the decomposition of an N-qubit unitary into a sequence of CNOT and U3 gate...
def PartitionCircuit
Definition: partition.py:211
def get_Qbit_Num(self)
Call to get the number of qubits in the circuit.
def CompareCircuits
Definition: utils.py:148