# SimQN: a discrete-event simulator for the quantum networks
# Copyright (C) 2021-2022 Lutong Chen, Jian Li, Kaiping Xue
# University of Science and Technology of China, USTC.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Any, List, Optional, Union
from qns.entity.entity import Entity
from qns.entity.node.node import QNode
from qns.models.delay.constdelay import ConstantDelayModel
from qns.models.delay.delay import DelayModel
from qns.simulator.simulator import Simulator
from qns.simulator.ts import Time
from qns.simulator.event import Event
from qns.models.core.backend import QuantumModel
import qns.utils.log as log
from qns.utils.rnd import get_rand
[docs]class QuantumChannel(Entity):
"""
QuantumChannel is the channel for transmitting qubit
"""
def __init__(self, name: str = None, node_list: List[QNode] = [],
bandwidth: int = 0, delay: Union[float, DelayModel] = 0, drop_rate: float = 0,
max_buffer_size: int = 0, length: float = 0, decoherence_rate: Optional[float] = 0,
transfer_error_model_args: dict = {}):
"""
Args:
name (str): the name of this channel
node_list (List[QNode]): a list of QNodes that it connects to
bandwidth (int): the qubit per second on this channel. 0 represents unlimited
delay (float): the time delay for transmitting a packet, or a ``DelayModel``
drop_rate (float): the drop rate
max_buffer_size (int): the max buffer size.
If it is full, the next coming packet will be dropped. 0 represents unlimited.
length (float): the length of this channel
decoherence_rate: the decoherence rate that will pass to the transfer_error_model
transfer_error_model_args (dict): the parameters that pass to the transfer_error_model
"""
super().__init__(name=name)
self.node_list = node_list.copy()
self.bandwidth = bandwidth
self.delay_model = delay if isinstance(delay, DelayModel) else ConstantDelayModel(delay=delay)
self.drop_rate = drop_rate
self.max_buffer_size = max_buffer_size
self.length = length
self.decoherence_rate = decoherence_rate
self.transfer_error_model_args = transfer_error_model_args
[docs] def install(self, simulator: Simulator) -> None:
'''
``install`` is called before ``simulator`` runs to initialize or set initial events
Args:
simulator (Simulator): the simulator
'''
if not self._is_installed:
self._simulator = simulator
self._next_send_time = self._simulator.ts
self._is_installed = True
[docs] def send(self, qubit: QuantumModel, next_hop: QNode):
"""
Send a qubit to the next_hop
Args:
qubit (QuantumModel): the transmitting qubit
next_hop (QNode): the next hop QNode
Raises:
NextHopNotConnectionException: the next_hop is not connected to this channel
"""
if next_hop not in self.node_list:
raise NextHopNotConnectionException
if self.bandwidth != 0:
if self._next_send_time <= self._simulator.current_time:
send_time = self._simulator.current_time
else:
send_time = self._next_send_time
if self.max_buffer_size != 0 and send_time > self._simulator.current_time\
+ self._simulator.time(sec=self.max_buffer_size / self.bandwidth):
# buffer is overflow
log.debug(f"qchannel {self}: drop qubit {qubit} due to overflow")
return
self._next_send_time = send_time + self._simulator.time(sec=1 / self.bandwidth)
else:
send_time = self._simulator.current_time
# random drop
if get_rand() < self.drop_rate:
log.debug(f"qchannel {self}: drop qubit {qubit} due to drop rate")
return
# add delay
recv_time = send_time + self._simulator.time(sec=self.delay_model.calculate())
# operation on the qubit
qubit.transfer_error_model(self.length, self.decoherence_rate, **self.transfer_error_model_args)
send_event = RecvQubitPacket(recv_time, name=None, by=self, qchannel=self,
qubit=qubit, dest=next_hop)
self._simulator.add_event(send_event)
def __repr__(self) -> str:
if self.name is not None:
return "<qchannel "+self.name+">"
return super().__repr__()
[docs]class NextHopNotConnectionException(Exception):
pass
[docs]class RecvQubitPacket(Event):
"""
The event for a QNode to receive a classic packet
"""
def __init__(self, t: Optional[Time] = None, qchannel: QuantumChannel = None,
qubit: QuantumModel = None, dest: QNode = None, name: Optional[str] = None, by: Optional[Any] = None):
super().__init__(t=t, name=name, by=by)
self.qchannel = qchannel
self.qubit = qubit
self.dest = dest
[docs] def invoke(self) -> None:
self.dest.handle(self)