Source code for app.domain.master_servers

"""This module contains domain specific classes that coordinate all
:py:mod:`app.domain.cluster_groups` of a simulation instance. These could
simulate centralized authentication servers, file localization or
file metadata servers or a bank of currently online and offline
:py:mod:`storage nodes <app.domain.network_nodes>`."""
from __future__ import annotations

import os
import json
import math
import datetime
from typing import Union, Dict, Any, Optional, List

import type_hints as th
import numpy as np
import environment_settings as es

from utils.convertions import class_name_to_obj
from domain.helpers.smart_dataclasses import FileBlockData

_PersistentingDict: Dict[str, Dict[str, Union[List[str], str]]]


[docs]class Master: """Simulation manager class, some kind of puppet-master. Could represent an authentication server or a monitor that decides along with other ``Master`` entities what :py:class:`network nodes <app.domain.network_nodes.Node>` are online using consensus algorithms. Attributes: origin (str): The name of the simulation file name that started the simulation process. sid (int): Identifier that generates unique output file names, thus guaranteeing that different simulation instances do not overwrite previous out files. epoch (int): The simulation's current epoch. cluster_groups (:py:class:`app.type_hints.ClusterDict`) A collection of :py:class:`cluster groups <app.domain.cluster_groups.Cluster>` managed by the ``Master``. Keys are :py:attr:`cluster identifiers <app.domain.cluster_groups.Cluster.id>` and values are the cluster instances. network_nodes (:py:class:`app.type_hints.NodeDict`): A dictionary mapping :py:attr:`node identifiers <app.domain.network_nodes.Node.id>` to their instance objects. This collection differs from :py:attr:`app.domain.cluster_groups.Cluster.members` attribute in the sense that the former ``network_nodes`` includes all nodes, both online and offline, available on the entire distributed backup storage system regardless of their participation in any :py:class:`cluster group <app.domain.cluster_groups.Cluster>`. """ MAX_EPOCHS: Optional[int] = None MAX_EPOCHS_PLUS_ONE: Optional[int] = None
[docs] def __init__(self, simfile_name: str, sid: int, epochs: int, cluster_class: str, node_class: str) -> None: """Instantiates an Master object. Args: simfile_name: A path to the simulation file to be run by the simulator. sid: Identifier that generates unique output file names, thus guaranteeing that different simulation instances do not overwrite previous out files. epochs: The number of discrete time steps the simulation lasts. cluster_class: The name of the class used to instantiate cluster group instances through reflection. See :py:mod:`cluster groups module <app.domain.cluster_groups>`. node_class: The name of the class used to instantiate network node instances through reflection. See :py:mod:`network nodes module <app.domain.network_nodes>`. """ Master.MAX_EPOCHS = epochs Master.MAX_EPOCHS_PLUS_ONE = epochs + 1 self.origin = simfile_name self.sim_id = sid self.epoch = 1 self.cluster_groups: th.ClusterDict = {} self.network_nodes: th.NodeDict = {} simfile_path: str = os.path.join(es.SIMULATION_ROOT, simfile_name) self._process_simfile(simfile_path, cluster_class, node_class)
# region Simulation setup
[docs] def _process_simfile( self, path: str, cluster_class: str, node_class: str) -> None: """Opens and processes the simulation filed referenced in ``path``. This method opens the file reads the json data inside it. Combined with :py:mod:`app.environment_settings` it sets up the class instances to be used during the simulation (e.g., :py:class:`cluster groups <app.domain.cluster_groups.Cluster>` and :py:class:`network nodes <app.domain.network_nodes.Node>`). This method also be splits the file to be persisted in the simulation into multiple ``blocks`` or ``chunks`` and for triggering the initial :py:meth:`file spreading <app.domain.cluster_groups.Cluster.spread_files>` mechanism. Args: path: The path to the simulation file. Including extension and parent folders. cluster_class: The name of the class used to instantiate cluster group instances through reflection. See :py:mod:`app.domain.cluster_groups`. node_class: The name of the class used to instantiate network node instances through reflection. See :py:mod:`app.domain.network_nodes`. """ with open(path) as input_file: simfile_json: Any = json.load(input_file) fspreads: Dict[str, str] = {} fblocks: Dict[str, th.ReplicasDict] = {} self._create_network_nodes(simfile_json, node_class) d: _PersistentingDict = simfile_json['persisting'] for fname in d: spread_strategy = d[fname]['spread'] fspreads[fname] = spread_strategy size = d[fname]['cluster_size'] cluster = self._new_cluster_group(cluster_class, size, fname) filesize = os.path.getsize(os.path.join(es.SHARED_ROOT, fname)) es.set_blocks_size(math.floor(filesize / es.BLOCKS_COUNT)) fblocks[fname] = self._split_files(fname, cluster, es.BLOCKS_SIZE) # fblocks[fname] = self._split_files(fname, cluster, es.READ_SIZE) # Distribute files before starting simulation for cluster in self.cluster_groups.values(): spread_strategy = fspreads[cluster.file.name] file_blocks = fblocks[cluster.file.name] cluster.spread_files(file_blocks, spread_strategy)
[docs] def _create_network_nodes( self, json: Dict[str, Any], node_class: str) -> None: """Helper method that instantiates all :py:class:`network nodes <app.domain.network_nodes.Node>` that are specified in the simulation file. Args: json: The simulation file in JSON dictionary object format. node_class: The type of network node to create. """ for nid, nuptime in json['nodes_uptime'].items(): node = self._new_network_node(node_class, nid, nuptime) self.network_nodes[nid] = node
[docs] def _split_files( self, fname: str, cluster: th.ClusterType, bsize: int ) -> th.ReplicasDict: """Helper method that splits the files into multiple blocks to be persisted in a :py:class:`cluster group <app.domain.cluster_groups.Cluster>`. Args: fname: The name of the file located in :py:const:`~app.environment_settings.SHARED_ROOT` folder to be read and splitted. cluster (:py:class:`~app.type_hints.ClusterType`): A reference to the :py:class:`cluster group <app.domain.cluster_groups.Cluster>` whose :py:attr:`~app.domain.cluster_groups.Cluster.members` will be responsible for ensuring the file specified in ``fname`` becomes durable. bsize: The maximum amount of bytes each file block can have. Returns: :py:class:`~app.type_hints.ReplicasDict`: A dictionary in which the keys are integers and values are :py:class:`file blocks <app.domain.helpers.smart_dataclasses.FileBlockData>`, whose attribute :py:attr:`~app.domain.helpers.smart_dataclasses.FileBlockData.number` is the key. """ with open(os.path.join(es.SHARED_ROOT, fname), "rb") as file: bid: int = 0 d: th.ReplicasDict = {} while True: read_buffer = file.read(bsize) if read_buffer: bid += 1 d[bid] = FileBlockData(cluster.id, fname, bid, read_buffer) else: break cluster.file.parts_count = bid return d
# endregion # region Simulation steps
[docs] def execute_simulation(self) -> None: """Starts the simulation processes.""" start_time = datetime.datetime.now() while self.epoch < Master.MAX_EPOCHS_PLUS_ONE and self.cluster_groups: print("epoch: {}".format(self.epoch)) terminated_clusters: List[str] = [] for cluster in self.cluster_groups.values(): cluster.execute_epoch(self.epoch) if not cluster.running: terminated_clusters.append(cluster.id) cluster.file.jwrite(cluster, self.origin, self.epoch) for cid in terminated_clusters: print(f"Cluster: {cid} terminated at epoch {self.epoch}") self.cluster_groups.pop(cid) self.epoch += 1 finish_time = datetime.datetime.now() delta_time = int((finish_time - start_time).total_seconds()) print(f"Master ({self.origin}_{self.sim_id}) exec time: {delta_time}")
# endregion # region Master API
[docs] def find_online_nodes( self, n: int = 1, blacklist: Optional[th.NodeDict] = None ) -> th.NodeDict: """Finds ``n`` :py:class:`network nodes <app.domain.network_nodes.Node>` who are currently registered at the ``Master`` and whose status is online. Args: n: How many :py:class:`network node <app.domain.network_nodes.Node>` references the requesting entity wants to find. blacklist (:py:class:`~app.type_hints.NodeDict`): A collection of :py:attr:`nodes identifiers <app.domain.network_nodes.Node.id>` and their object instances, which specify nodes the requesting entity has no interest in. Returns: :py:class:`~app.type_hints.NodeDict`: A collection of :py:class:`network nodes <app.domain.network_nodes.Node>` which is at most as big as ``n``, which does not include any node named in ``blacklist``. """ selected: th.NodeDict = {} if n < 1: return selected if blacklist is None: blacklist = {} network_nodes_view = self.network_nodes.copy().values() for node in network_nodes_view: if len(selected) >= n: return selected if node.id not in blacklist and node.is_up(): selected[node.id] = node return selected
# endregion # region Helpers
[docs] def _new_cluster_group( self, cluster_class: str, size: int, fname: str ) -> th.ClusterType: """Helper method that initializes a new Cluster group. Args: cluster_class: The name of the class used to instantiate cluster group instances through reflection. See :py:mod:`cluster groups module <app.domain.cluster_groups>`. size: The :py:class:`cluster's <app.domain.cluster_groups.Cluster>` initial memberhip size. fname: The name of the fille being stored in the cluster. Returns: :py:class:`~app.type_hints.ClusterType`: The :py:class:`~app.domain.cluster_groups.Cluster` instance. """ cluster_members: th.NodeDict = {} nodes = np.random.choice( a=tuple(self.network_nodes), size=size, replace=False) for node_id in nodes: cluster_members[node_id] = self.network_nodes[node_id] cluster = class_name_to_obj( es.CLUSTER_GROUPS, cluster_class, [self, fname, cluster_members, self.sim_id, self.origin] ) self.cluster_groups[cluster.id] = cluster return cluster
[docs] def _new_network_node( self, node_class: str, nid: str, node_uptime: str) -> th.NodeType: """Helper method that initializes a new Node. Args: node_class: The name of the class used to instantiate network node instances through reflection. See :py:mod:`network nodes module <app.domain.network_nodes>`. nid: An id that will uniquely identifies the :py:class:`network node <app.domain.network_nodes.Node>`. node_uptime: A float value in string representation that defines the uptime of the network node. Returns: :py:class:`~app.type_hints.NodeType`: The :py:class:`~app.domain.network_nodes.Node` instance. """ return class_name_to_obj( es.NETWORK_NODES, node_class, [nid, node_uptime])
# endregion
[docs]class SGMaster(Master):
[docs] def get_cloud_reference(self) -> str: """Use to obtain a reference to 3rd party cloud storage provider The cloud storage provider can be used to temporarely host files belonging to :py:class:`cluster clusters <app.domain.SGCluster>` in bad conditions that may compromise the file durability of the files they are responsible for persisting. Note: This method is virtual. Returns: A pointer to the cloud server, e.g., an IP Address. """ return ""
# endregion
[docs]class HDFSMaster(Master): # region Simulation setup
[docs] def _process_simfile( self, path: str, cluster_class: str, node_class: str) -> None: """Opens and processes the simulation filed referenced in `path`. Overrides: :py:meth:`app.domain.master_servers.Master._process_simfile`. The method is exactly the same except for one instruction. The :py:meth:`~app.domain.master_servers.Master._split_files` is invoked with fixed `bsize` = 1MB. The reason for this is two-fold: - The default and, thus recommended, block size for the \ hadoop distributed file system is 128MB. The system is not \ designed to perform well with small file blocks, but SG \ requires many file blocks to work, hence being more \ effective with small block sizes. - Hadoop limits the minimum block size to be 1MB, \ `dfs.namenode.fs-limits.min-block-size <https://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml#dfs.namenode.fs-limits.min-block-size>`_. \ For this reason, we make HDFSMaster split files into 1MB \ chunks, as that is the closest we would get to our Hive's \ default block size in the real world. The other difference is that the spread strategy is ignored. We are not interested in knowing if the way the files are initially spread affects the time it takes for clusters to achieve a steady-state distribution since in HDFS :py:class:`file block replicas <app.domain.helpers.smart_dataclasses.FileBlockData>` are stationary on data nodes until they die. Args: path: The path to the simulation file. Including extension and parent folders. cluster_class: The name of the class used to instantiate cluster group instances through reflection. See :py:mod:`app.domain.cluster_groups`. node_class: The name of the class used to instantiate network node instances through reflection. See :py:mod:`app.domain.network_nodes`. """ with open(path) as input_file: simfile_json: Any = json.load(input_file) fspreads: Dict[str, str] = {} fblocks: Dict[str, th.ReplicasDict] = {} self._create_network_nodes(simfile_json, node_class) d: _PersistentingDict = simfile_json['persisting'] for fname in d: spread_strategy = d[fname]['spread'] fspreads[fname] = spread_strategy size = d[fname]['cluster_size'] cluster = self._new_cluster_group(cluster_class, size, fname) filesize = os.path.getsize(os.path.join(es.SHARED_ROOT, fname)) es.set_blocks_size(1 * 1024 * 1024) # 1MB blocks. es.set_blocks_count(math.ceil(filesize / es.BLOCKS_SIZE)) fblocks[fname] = self._split_files(fname, cluster, es.BLOCKS_SIZE) # Distribute files before starting simulation for cluster in self.cluster_groups.values(): file_blocks = fblocks[cluster.file.name] cluster.spread_files(file_blocks)
# endregion
[docs]class NewscastMaster(Master):
[docs] def __init__(self, simfile_name: str, sid: int, epochs: int, cluster_class: str, node_class: str) -> None: super().__init__(simfile_name, sid, epochs, cluster_class, node_class) for cluster in self.cluster_groups.values(): cluster.wire_k_out()
# region Simulation setup
[docs] def _process_simfile( self, path: str, cluster_class: str, node_class: str) -> None: """Opens and processes the simulation filed referenced in `path`. Overrides: :py:meth:`app.domain.master_servers.Master._process_simfile`. Newscast is a gossip-based P2P network. We assume erasure-coding would be used in this scenario and thus, for simplicity, we divide the specified file's size into multiple ``1/N``, where ``N`` is the number of :py:class:`network nodes <app.domain.network_nodes.NewscastNode>` in the system. Note: This class, :py:class:`~app.domain.cluster_groups.NewscastCluster` and :py:class:`~app.domain.network_nodes.NewscastNode` were created to test our simulators performance, concerning the amount of supported simultaneous network nodes in a simulation. We do not actually care if the created file blocks are lost as the :py:class:`network nodes <app.domain.network_nodes.NewscastNode>` job in the simulation is to carry out the protocol defined in `PeerSim's AverageFunction <http://peersim.sourceforge.net/doc/index.html>`_. `PeerSim <http://peersim.sourceforge.net/>`_ uses configuration ``Example 2`` provided in release 1.0.5, as a means of testing the simulator performance, according to this `Ms.C. dissertation by J. Neto <https://www.gsd.inesc-id.pt/~lveiga/papers/msc-supervised-thesis-abstracts/jneto-FINAL.pdf>`_. This configuration uses Newscast protocol with AverageFunction and periodic monitoring of the system state. We implement our version of `Adaptaive Peer Sampling with Newscast <https://dl.acm.org/doi/abs/10.1007/978-3-642-03869-3_50>`_ by N. Tölgyesi and M. Jelasity, to avoid the effort of translating PeerSim's code. Args: path: The path to the simulation file. Including extension and parent folders. cluster_class: The name of the class used to instantiate cluster group instances through reflection. See :py:mod:`app.domain.cluster_groups`. node_class: The name of the class used to instantiate network node instances through reflection. See :py:mod:`app.domain.network_nodes`. """ with open(path) as input_file: simfile_json: Any = json.load(input_file) self._create_network_nodes(simfile_json, node_class) d: _PersistentingDict = simfile_json['persisting'] for fname in d: spread_strategy = d[fname]['spread'] cluster_size = d[fname]['cluster_size'] cluster = self._new_cluster_group( cluster_class, cluster_size, fname) file_path = os.path.join(es.SHARED_ROOT, fname) block_size = os.path.getsize(file_path) / cluster_size block_size = math.ceil(block_size) file_blocks = self._split_files(fname, cluster, int(block_size)) cluster.spread_files(file_blocks, spread_strategy)
# endregion