Source code for netket.operator._local_liouvillian

# Copyright 2021 The NetKet Authors - All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List as PyList

import numpy as np
import numba
from numba import jit
from numba.typed import List

from scipy.sparse.linalg import LinearOperator

from ._discrete_operator import DiscreteOperator
from ._local_operator import LocalOperator
from ._abstract_super_operator import AbstractSuperOperator


class LocalLiouvillian(AbstractSuperOperator):
    """
    LocalLiouvillian super-operator, acting on the DoubledHilbert (tensor product) space
    ℋ⊗ℋ.

    Internally it uses :ref:`netket.operator.LocalOperator` everywhere.


    The Liouvillian is defined according to the definition:

    .. math ::

        \\mathcal{L} = -i \\left[ \\hat{H}, \\hat{\\rho}\\right] + \\sum_i \\left[ \\hat{L}_i\\hat{\\rho}\\hat{L}_i^\\dagger -
            \\left\\{ \\hat{L}_i^\\dagger\\hat{L}_i, \\hat{\\rho} \\right\\} \\right]

    which generates the dynamics according to the equation

    .. math ::

        \\frac{d\\hat{\\rho}}{dt} = \\mathcal{L}\\hat{\\rho}

    Internally, it stores the non-hermitian hamiltonian

    .. math ::

        \\hat{H}_{nh} = \\hat{H} - \\sum_i \\frac{i}{2}\\hat{L}_i^\\dagger\\hat{L}_i

    That is then composed with the jump operators in the inner kernel with the formula:

    .. math ::

        \\mathcal{L} = -i \\hat{H}_{nh}\\hat{\\rho} +i\\hat{\\rho}\\hat{H}_{nh}^\\dagger + \\sum_i \\hat{L}_i\\hat{\\rho}\\hat{L}_i^\\dagger

    """

    def __init__(
        self,
        ham: DiscreteOperator,
        jump_ops: PyList[DiscreteOperator] = [],
        dtype=complex,
    ):
        super().__init__(ham.hilbert)

        self._H = ham
        self._jump_ops = [op.copy(dtype=dtype) for op in jump_ops]  # to accept dicts
        self._Hnh = ham
        self._max_dissipator_conn_size = 0
        self._max_conn_size = 0

        self._dtype = dtype

        self._compute_hnh()

    @property
    def dtype(self):
        return self._dtype

    @property
    def is_hermitian(self):
        return False

    @property
    def hamiltonian(self) -> LocalOperator:
        """The hamiltonian of this Liouvillian"""
        return self._H

    @property
    def hamiltonian_nh(self) -> LocalOperator:
        """The non hermitian Local Operator part of the Liouvillian"""
        return self._Hnh

    @property
    def jump_operators(self) -> PyList[LocalOperator]:
        """The list of local operators in this Liouvillian"""
        return self._jump_ops

    @property
    def max_conn_size(self) -> int:
        """The maximum number of non zero ⟨x|O|x'⟩ for every x."""
        return self._max_conn_size

    def _compute_hnh(self):
        # There is no i here because it's inserted in the kernel
        Hnh = 1.0 * self._H
        self._max_dissipator_conn_size = 0
        for L in self._jump_ops:
            Hnh = Hnh - 0.5j * L.conjugate().transpose() @ L
            self._max_dissipator_conn_size += L.max_conn_size**2

        self._Hnh = Hnh.collect()

        max_conn_size = self._max_dissipator_conn_size + 2 * Hnh.max_conn_size
        self._max_conn_size = max_conn_size
        self._xprime = np.empty((max_conn_size, self.hilbert.size))
        self._xr_prime = np.empty((max_conn_size, self.hilbert.physical.size))
        self._xc_prime = np.empty((max_conn_size, self.hilbert.physical.size))
        self._xrv = self._xprime[:, 0 : self.hilbert.physical.size]
        self._xcv = self._xprime[
            :, self.hilbert.physical.size : 2 * self.hilbert.physical.size
        ]
        self._mels = np.empty(max_conn_size, dtype=self.dtype)

        self._xprime_f = np.empty((max_conn_size, self.hilbert.size))
        self._mels_f = np.empty(max_conn_size, dtype=self.dtype)

[docs] def add_jump_operator(self, op): self._jump_ops.append(op) self._compute_hnh()
[docs] def add_jump_operators(self, ops): for op in ops: self._jump_ops.append(op) self._compute_hnh()
[docs] def get_conn(self, x): n_sites = x.shape[0] // 2 xr, xc = x[0:n_sites], x[n_sites : 2 * n_sites] i = 0 xrp, mel_r = self._Hnh.get_conn(xr) self._xrv[i : i + len(mel_r), :] = xrp self._xcv[i : i + len(mel_r), :] = xc self._mels[i : i + len(mel_r)] = -1j * mel_r i = i + len(mel_r) xcp, mel_c = self._Hnh.get_conn(xc) self._xrv[i : i + len(mel_c), :] = xr self._xcv[i : i + len(mel_c), :] = xcp self._mels[i : i + len(mel_r)] = 1j * np.conj(mel_c) i = i + len(mel_c) for L in self._jump_ops: L_xrp, L_mel_r = L.get_conn(xr) L_xcp, L_mel_c = L.get_conn(xc) nr = len(L_mel_r) nc = len(L_mel_c) # start filling batches for r in range(nr): self._xrv[i : i + nc, :] = L_xrp[r, :] self._xcv[i : i + nc, :] = L_xcp self._mels[i : i + nc] = L_mel_r[r] * np.conj(L_mel_c) i = i + nc return np.copy(self._xprime[0:i, :]), np.copy(self._mels[0:i])
# pad option pads all sections to have the same (biggest) size. # to avoid using the biggest possible size, we dynamically check what is # the biggest size for the current size... # TODO: investigate if this is worthless
[docs] def get_conn_flattened(self, x, sections, pad=False): batch_size = x.shape[0] N = x.shape[1] // 2 n_jops = len(self.jump_operators) assert sections.shape[0] == batch_size # Separate row and column inputs xr, xc = x[:, 0:N], x[:, N : 2 * N] # Compute all flattened connections of each term sections_r = np.empty(batch_size, dtype=np.int64) sections_c = np.empty(batch_size, dtype=np.int64) xr_prime, mels_r = self._Hnh.get_conn_flattened(xr, sections_r) xc_prime, mels_c = self._Hnh.get_conn_flattened(xc, sections_c) if pad: # if else to accomodate for batches of 1 element, because # sections don't start from 0-index... # TODO: if we ever switch to 0-indexing, remove this. if batch_size > 1: max_conns_r = np.max(np.diff(sections_r)) max_conns_c = np.max(np.diff(sections_c)) else: max_conns_r = sections_r[0] max_conns_c = sections_c[0] max_conns_Lrc = 0 #  Must type those lists otherwise, if they are empty, numba # cannot infer their type L_xrps = List.empty_list(numba.typeof(x.dtype)[:, :]) L_xcps = List.empty_list(numba.typeof(x.dtype)[:, :]) L_mel_rs = List.empty_list(numba.typeof(self.dtype())[:]) L_mel_cs = List.empty_list(numba.typeof(self.dtype())[:]) sections_Lr = np.empty(batch_size * n_jops, dtype=np.int32) sections_Lc = np.empty(batch_size * n_jops, dtype=np.int32) for (i, L) in enumerate(self._jump_ops): L_xrp, L_mel_r = L.get_conn_flattened( xr, sections_Lr[i * batch_size : (i + 1) * batch_size] ) L_xcp, L_mel_c = L.get_conn_flattened( xc, sections_Lc[i * batch_size : (i + 1) * batch_size] ) L_xrps.append(L_xrp) L_xcps.append(L_xcp) L_mel_rs.append(L_mel_r) L_mel_cs.append(L_mel_c) if pad: if batch_size > 1: max_lr = np.max( np.diff(sections_Lr[i * batch_size : (i + 1) * batch_size]) ) max_lc = np.max( np.diff(sections_Lc[i * batch_size : (i + 1) * batch_size]) ) else: max_lr = sections_Lr[i * batch_size] max_lc = sections_Lc[i * batch_size] max_conns_Lrc += max_lr * max_lc # compose everything again if self._xprime_f.shape[0] < self._max_conn_size * batch_size: # refcheck=False because otherwise this errors when testing self._xprime_f.resize( self._max_conn_size * batch_size, self.hilbert.size, refcheck=False ) self._mels_f.resize(self._max_conn_size * batch_size, refcheck=False) if pad: pad = max_conns_Lrc + max_conns_r + max_conns_c else: pad = 0 self._xprime_f[:] = 0 self._mels_f[:] = 0 return self._get_conn_flattened_kernel( self._xprime_f, self._mels_f, sections, np.asarray(xr), np.asarray(xc), sections_r, sections_c, xr_prime, mels_r, xc_prime, mels_c, L_xrps, L_xcps, L_mel_rs, L_mel_cs, sections_Lr, sections_Lc, n_jops, batch_size, N, pad, )
@staticmethod @jit(nopython=True) def _get_conn_flattened_kernel( xs, mels, sections, xr, xc, sections_r, sections_c, xr_prime, mels_r, xc_prime, mels_c, L_xrps, L_xcps, L_mel_rs, L_mel_cs, sections_Lr, sections_Lc, n_jops, batch_size, N, pad, ): off = 0 n_hr_i = 0 n_hc_i = 0 n_Lr_is = np.zeros(n_jops, dtype=np.int32) n_Lc_is = np.zeros(n_jops, dtype=np.int32) for i in range(batch_size): off_i = off n_hr_f = sections_r[i] n_hr = n_hr_f - n_hr_i xs[off : off + n_hr, 0:N] = xr_prime[n_hr_i:n_hr_f, :] xs[off : off + n_hr, N : 2 * N] = xc[i, :] mels[off : off + n_hr] = -1j * mels_r[n_hr_i:n_hr_f] off += n_hr n_hr_i = n_hr_f n_hc_f = sections_c[i] n_hc = n_hc_f - n_hc_i xs[off : off + n_hc, N : 2 * N] = xc_prime[n_hc_i:n_hc_f, :] xs[off : off + n_hc, 0:N] = xr[i, :] mels[off : off + n_hc] = 1j * np.conj(mels_c[n_hc_i:n_hc_f]) off += n_hc n_hc_i = n_hc_f for j in range(n_jops): L_xrp, L_mel_r = L_xrps[j], L_mel_rs[j] L_xcp, L_mel_c = L_xcps[j], L_mel_cs[j] n_Lr_f = sections_Lr[j * batch_size + i] n_Lc_f = sections_Lc[j * batch_size + i] n_Lr_i = n_Lr_is[j] n_Lc_i = n_Lc_is[j] n_Lr = n_Lr_f - n_Lr_is[j] n_Lc = n_Lc_f - n_Lc_is[j] # start filling batches for r in range(n_Lr): xs[off : off + n_Lc, 0:N] = L_xrp[n_Lr_i + r, :] xs[off : off + n_Lc, N : 2 * N] = L_xcp[n_Lc_i:n_Lc_f, :] mels[off : off + n_Lc] = L_mel_r[n_Lr_i + r] * np.conj( L_mel_c[n_Lc_i:n_Lc_f] ) off = off + n_Lc n_Lr_is[j] = n_Lr_f n_Lc_is[j] = n_Lc_f if pad != 0: n_entries = off - off_i mels[off : off + n_entries] = 0 off = (i + 1) * pad sections[i] = off return np.copy(xs[0:off, :]), np.copy(mels[0:off])
[docs] def to_linear_operator( self, *, sparse: bool = True, append_trace: bool = False ) -> LinearOperator: r"""Returns a lazy scipy linear_operator representation of the Lindblad Super-Operator. The returned operator behaves like the M**2 x M**2 matrix obtained with to_dense/sparse, and accepts vectorised density matrices as input. Args: sparse: If True internally uses sparse matrices for the hamiltonian and jump operators, dense otherwise (default=True) append_trace: If True (default=False) the resulting operator has size M**2 + 1, and the last element of the input vector is the trace of the input density matrix. This is useful when implementing iterative methods. Returns: A linear operator taking as input vectorised density matrices and returning the product L*rho """ M = self.hilbert.physical.n_states iHnh = -1j * self.hamiltonian_nh if sparse: iHnh = iHnh.to_sparse() J_ops = [j.to_sparse() for j in self.jump_operators] J_ops_c = [ j.conjugate().transpose().to_sparse() for j in self.jump_operators ] else: iHnh = iHnh.to_dense() J_ops = [j.to_dense() for j in self.jump_operators] J_ops_c = [ j.conjugate().transpose().to_dense() for j in self.jump_operators ] if not append_trace: op_size = M**2 def matvec(rho_vec): rho = rho_vec.reshape((M, M)) drho = np.zeros((M, M), dtype=rho.dtype) drho += iHnh @ rho + rho @ iHnh.conj().T for J, J_c in zip(J_ops, J_ops_c): drho += (J @ rho) @ J_c return drho.reshape(-1) else: # This function defines the product Liouvillian x densitymatrix, without # constructing the full density matrix (passed as a vector M^2). # An extra row is added at the bottom of the therefore M^2+1 long array, # with the trace of the density matrix. This is needed to enforce the # trace-1 condition. # The logic behind the use of Hnh_dag_ and Hnh_ is derived from the # convention adopted in local_liouvillian.cc, and inspired from reference # arXiv:1504.05266 op_size = M**2 + 1 def matvec(rho_vec): rho = rho_vec[:-1].reshape((M, M)) out = np.zeros((M**2 + 1), dtype=rho.dtype) drho = out[:-1].reshape((M, M)) drho += iHnh @ rho + rho @ iHnh.conj().T for J, J_c in zip(J_ops, J_ops_c): drho += (J @ rho) @ J_c out[-1] = rho.trace() return out L = LinearOperator((op_size, op_size), matvec=matvec, dtype=iHnh.dtype) return L
[docs] def to_qobj(self) -> "qutip.liouvillian": # noqa: F821 r"""Convert the operator to a qutip's liouvillian Qobj. Returns: A `qutip.liouvillian` object. """ from qutip import liouvillian return liouvillian( self.hamiltonian.to_qobj(), [op.to_qobj() for op in self.jump_operators] )