# SimQN: a discrete-event simulator for the quantum networks
# Copyright (C) 2021-2022 Lutong Chen, Jian Li, Kaiping Xue
# University of Science and Technology of China, USTC.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from qns.entity.cchannel.cchannel import ClassicChannel, RecvClassicPacket, ClassicPacket
from qns.entity.node.app import Application
from qns.entity.qchannel.qchannel import QuantumChannel, RecvQubitPacket
from qns.entity.node.node import QNode
from qns.models.qubit.const import BASIS_X, BASIS_Z, \
from qns.simulator.event import Event, func_to_event
from qns.simulator.simulator import Simulator
from qns.models.qubit import Qubit
import numpy as np
from qns.utils.rnd import get_rand, get_choice
[docs]class QubitWithError(Qubit):
[docs] def transfer_error_model(self, length: float, decoherence_rate: float = 0, **kwargs):
lkm = length / 1000
standand_lkm = 50.0
theta = get_rand() * lkm / standand_lkm * np.pi / 4
operation = np.array([[np.cos(theta), - np.sin(theta)], [np.sin(theta), np.cos(theta)]], dtype=np.complex128)
[docs]class BB84SendApp(Application):
def __init__(self, dest: QNode, qchannel: QuantumChannel,
cchannel: ClassicChannel, send_rate=1000):
self.dest = dest
self.qchannel = qchannel
self.cchannel = cchannel
self.send_rate = send_rate
self.count = 0
self.qubit_list = {}
self.basis_list = {}
self.measure_list = {}
self.succ_key_pool = {}
self.fail_number = 0
self.add_handler(self.handleClassicPacket, [RecvClassicPacket], [self.cchannel])
[docs] def install(self, node: QNode, simulator: Simulator):
super().install(node, simulator)
time_list = []
t = simulator.ts
event = func_to_event(t, self.send_qubit, by=self)
# while t <= simulator.te:
# time_list.append(t)
# t = t + simulator.time(sec = 1 / self.send_rate)
# event = func_to_event(t, self.send_qubit)
# self._simulator.add_event(event)
[docs] def handleClassicPacket(self, node: QNode, event: Event):
[docs] def check_basis(self, event: RecvClassicPacket):
packet = event.packet
msg: dict = packet.get()
id = msg.get("id")
basis_dest = msg.get("basis")
# qubit = self.qubit_list[id]
basis_src = "Z" if (self.basis_list[id] == BASIS_Z).all() else "X"
if basis_dest == basis_src:
# log.info(f"[{self._simulator.current_time}] src check {id} basis succ")
self.succ_key_pool[id] = self.measure_list[id]
# log.info(f"[{self._simulator.current_time}] src check {id} basis fail")
self.fail_number += 1
packet = ClassicPacket(msg={"id": id, "basis": basis_src,
"ret": self.measure_list[id]}, src=self._node, dest=self.dest)
self.cchannel.send(packet, next_hop=self.dest)
[docs] def send_qubit(self):
# randomly generate a qubit
state = get_choice([QUBIT_STATE_0, QUBIT_STATE_1,
qubit = QubitWithError(state=state)
basis = BASIS_Z if (state == QUBIT_STATE_0).all() or (
state == QUBIT_STATE_1).all() else BASIS_X
# basis_msg = "Z" if (basis == BASIS_Z).all() else "X"
ret = 0 if (state == QUBIT_STATE_0).all() or (
state == QUBIT_STATE_P).all() else 1
qubit.id = self.count
self.count += 1
self.qubit_list[qubit.id] = qubit
self.basis_list[qubit.id] = basis
self.measure_list[qubit.id] = ret
# log.info(f"[{self._simulator.current_time}] send qubit {qubit.id},\
# basis: {basis_msg} , ret: {ret}")
self.qchannel.send(qubit=qubit, next_hop=self.dest)
t = self._simulator.current_time + \
self._simulator.time(sec=1 / self.send_rate)
event = func_to_event(t, self.send_qubit, by=self)
[docs]class BB84RecvApp(Application):
def __init__(self, src: QNode, qchannel: QuantumChannel, cchannel: ClassicChannel):
self.src = src
self.qchannel = qchannel
self.cchannel = cchannel
self.qubit_list = {}
self.basis_list = {}
self.measure_list = {}
self.succ_key_pool = {}
self.fail_number = 0
self.add_handler(self.handleQuantumPacket, [RecvQubitPacket], [self.qchannel])
self.add_handler(self.handleClassicPacket, [RecvClassicPacket], [self.cchannel])
[docs] def handleQuantumPacket(self, node: QNode, event: Event):
return self.recv(event)
[docs] def handleClassicPacket(self, node: QNode, event: Event):
return self.check_basis(event)
[docs] def check_basis(self, event: RecvClassicPacket):
packet = event.packet
msg: dict = packet.get()
id = msg.get("id")
basis_src = msg.get("basis")
# qubit = self.qubit_list[id]
basis_dest = "Z" if (self.basis_list[id] == BASIS_Z).all() else "X"
ret_dest = self.measure_list[id]
ret_src = msg.get("ret")
if basis_dest == basis_src and ret_dest == ret_src:
# log.info(f"[{self._simulator.current_time}] dest check {id} basis succ")
self.succ_key_pool[id] = self.measure_list[id]
# log.info(f"[{self._simulator.current_time}] dest check {id} basis fail")
self.fail_number += 1
[docs] def recv(self, event: RecvQubitPacket):
qubit: Qubit = event.qubit
# randomly choose X,Z basis
basis = get_choice([BASIS_Z, BASIS_X])
basis_msg = "Z" if (basis == BASIS_Z).all() else "X"
ret = qubit.measureZ() if (basis == BASIS_Z).all() else qubit.measureX()
self.qubit_list[qubit.id] = qubit
self.basis_list[qubit.id] = basis
self.measure_list[qubit.id] = ret
# log.info(f"[{self._simulator.current_time}] recv qubit {qubit.id}, \
# basis: {basis_msg}, ret: {ret}")
packet = ClassicPacket(
msg={"id": qubit.id, "basis": basis_msg}, src=self._node, dest=self.src)
self.cchannel.send(packet, next_hop=self.src)