Source code for qns.utils.multiprocess

#    SimQN: a discrete-event simulator for the quantum networks
#    Copyright (C) 2021-2022 Lutong Chen, Jian Li, Kaiping Xue
#    University of Science and Technology of China, USTC.
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.

import itertools
import multiprocessing
from typing import Optional, Dict
import pandas as pd
from qns.utils.log import logger as log

import signal


[docs]class MPSimulations(): """ MultiProcessSimulations will help users to perfrom multiple simulations with different experiment settings and leverage multiple processes. """ def __init__(self, settings: Dict = {}, iter_count: int = 1, aggregate: bool = True, cores: int = -1, name: Optional[str] = None) -> None: """ Args: settings: a dictionary object that contains simulation settings, e.g., {"node_num": [10, 20, 30], "request_num": [1, 2, 3], "memory_size": [50, 100]} iter_count (int): for each setting, the repeat number of the same experiments. (with different random seed) aggregate (bool): aggregate experiments with the same settings and calculate mean and std cores (int): the number of CPUs, default is -1 means to use all CPUs. name (str): the name of this simulation. """ self.settings = settings self.iter_count = iter_count self.name = name self.aggregate = aggregate self.cores = cores if cores > 0 else multiprocessing.cpu_count() self.data = pd.DataFrame() self.aggregated_data = pd.DataFrame() self._setting_list = [] self._current_simulation_count = 0 self._total_simulation_count = 0 self._start_time = None self._end_time = None
[docs] def run(self, setting: Dict = {}) -> Dict: """ This function should be overwited by users to provide codes that run a single simulation. Args: setting (Dict): the simulation setting, e.g. {'node_num': 10, 'req_num': 10, 'memory_size': 50} Returns: a dictionary that contains all results, e.g. {'throughput': 100, 'fidelity': 0.88} """ raise NotImplementedError return {}
def _single_run(self, setting: Dict = {}): raw = {} log.info(f"start simulation [{setting['_id']+1}/{self._total_simulation_count}] {setting}") result = self.run(setting=setting) raw.update(setting) raw.update(result) log.info(f"finish simulation [{setting['_id']+1}/{self._total_simulation_count}] {result}") return raw def _init_worker(self): signal.signal(signal.SIGINT, signal.SIG_IGN)
[docs] def start(self): """ Start the multiple process simulation """ self.prepare_setting() pool = multiprocessing.Pool(processes=self.cores, initializer=self._init_worker) try: result = [] for setting in self._setting_list: result.append(pool.apply_async(self._single_run, (setting,))) pool.close() pool.join() except KeyboardInterrupt: print("terminating simulation") pool.terminate() pool.join() for r in result: try: raw_data = r.get() except Exception: print("[simulator] error in simulation") continue new_result = {} for k, v in raw_data.items(): new_result[k] = [v] result_pd = pd.DataFrame(new_result) self.data = pd.concat([self.data, result_pd], ignore_index=True) if self.aggregate: mean = pd.DataFrame(self.data).drop(columns=["_id", "_group", "_repeat"]) new_name = {} ck = mean.columns for k in ck: if k not in self.settings.keys(): new_name[k] = k+"_mean" mean = mean.rename(columns=new_name) mean = mean.groupby(by=list(self.settings.keys())).mean() std = pd.DataFrame(self.data).drop(columns=["_id", "_group", "_repeat"]) new_name = {} ck = std.columns for k in ck: if k not in self.settings.keys(): new_name[k] = k+"_std" std = std.rename(columns=new_name) std = std.groupby(by=list(self.settings.keys())).std() self.aggregated_data = pd.merge(mean, std, on=list(self.settings.keys()))
[docs] def prepare_setting(self): """ Generate the experiment setting for each experiments. """ keys = self.settings.keys() _tmp = [] for k in keys: _tmp.append(self.settings.get(k, [])) id_count = 0 for setting in itertools.product(*_tmp): for j in range(self.iter_count): setting_dict = {} for i, k in enumerate(keys): setting_dict[k] = setting[i] setting_dict["_repeat"] = j setting_dict["_group"] = id_count setting_dict["_id"] = id_count * self.iter_count + j self._setting_list.append(setting_dict) id_count += 1 self._total_simulation_count = len(self._setting_list) self._current_simulation_count = 0
[docs] def get_data(self): """ Get the simulation results Returns: a result data in pd.DataFrame """ return self.aggregated_data if self.aggregate else self.data
[docs] def get_raw_data(self): """ Get the original raw results, no matter aggregate is ``True`` or not. Returns: a result data in pd.DataFrame """ return self.data