Source code for app.domain.network_nodes

"""This module contains domain specific classes that represent network nodes
responsible for the storage of :py:class:`file blocks
<app.domain.helpers.smart_dataclasses.FileBlockData>`. These could be
reliable servers or P2P nodes."""
from __future__ import annotations

import sys
import math
import random
import traceback
from typing import Union, Dict, Optional, Any

import domain.helpers.smart_dataclasses as sd
import domain.helpers.enums as e
import domain.master_servers as ms
import type_hints as th
import pandas as pd
import numpy as np
import environment_settings as es

from utils import crypto

_NetworkView: Dict[Union[str, Node], int]


[docs]class Node: """This class contains basic network node functionality that should always be useful. Attributes: id (str): A unique identifier for the ``Node`` instance. uptime (float): The amount of time the ``Node`` is expected to remain online without disconnecting. Current uptime implementation is based on availability percentages. Note: Current implementation expects ``network nodes`` joining a :py:class:`cluster group <app.domain.cluster_groups.Cluster>` to remain online for approximately: ``time_to_live`` = :py:attr:`~app.domain.network_nodes.Node.uptime` * :py:attr:`~app.domain.master_servers.Master.MAX_EPOCHS`. However, a ``network node`` who belongs to multiple :py:class:`cluster groups <app.domain.cluster_groups.Cluster>` may disconnect earlier than that, i.e., ``network nodes`` remain online ``time_to_live`` after their first operation on the distributed backup system. status (:py:class:`app.domain.helpers.enums.Status`): Indicates if the ``Node`` instance is online or offline. In later releases this could also contain a 'suspect' status. suspicious_replies (:py:class:`~py:set`): Collection that contains :py:class:`http codes <app.domain.helpers.enums.HttpCodes>` that when received, trigger complaints to monitors about the replier. files (Dict[str, :py:class:`~app.type_hints.ReplicasDict`]): A dictionary mapping file names to dictionaries of file block identifiers and their respective contents, i.e., the :py:class:`file block replicas <app.domain.helpers.smart_dataclasses.FileBlockData>` hosted at the ``Node``. """
[docs] def __init__(self, uid: str, uptime: float) -> None: """Instantiates a ``Node`` object. These are network nodes responsible for persisting :py:class:`file block replicas <app.domain.helpers.smart_dataclasses .FileBlockData>`. Args: uid: An unique identifier for the ``Node`` instance. uptime: The availability of the ``Node`` instance. """ self.id: str = uid if uptime == 1.0: ttl = float('inf') else: ttl = math.floor(uptime * ms.Master.MAX_EPOCHS) self.uptime: float = ttl self.status: int = e.Status.ONLINE self.suspicious_replies = { e.HttpCodes.NOT_FOUND, e.HttpCodes.TIME_OUT, e.HttpCodes.SERVER_DOWN, } self.files: Dict[str, th.ReplicasDict] = {}
# region Simulation steps
[docs] def update_status(self) -> int: """Used to update the time to live of the node instance. When invoked, the network node decides if it should remain online or change some other state. Returns: :py:class:`~app.domain.helpers.enums.Status`: The the status of the ``Node``. """ if self.is_up(): self.uptime -= 1 if self.uptime <= 0: self.status = e.Status.OFFLINE return self.status
[docs] def execute_epoch(self, cluster: th.ClusterType, fid: str) -> None: """Instructs the ``Node`` instance to execute the epoch. Args: cluster (:py:class:`~app.type_hints.ClusterType`): A reference to the :py:class:`~app.domain.cluster_groups.Cluster` that invoked the ``Node`` method. fid: The :py:attr:`file name identifier <app.domain.helpers.smart_dataclasses.FileData.name>` of the file being simulated. Raises: NotImplementedError: When children of ``Node`` do not implement the abstract method. """ raise NotImplementedError("All children of class Node must " "implement their own execute_epoch.")
# endregion # region File block management
[docs] def receive_part(self, replica: sd.FileBlockData) -> int: """Endpoint for file block replica reception. The ``Node`` stores a new :py:class:`file block replica <app.domain.helpers.smart_dataclasses.FileBlockData>` in :py:attr:`files` if he does not have a replica with same :py:attr:`identifier <app.domain.helpers.smart_dataclasses.FileBlockData.id>`. Args: replica: The :py:class:`file block replica <app.domain.helpers.smart_dataclasses.FileBlockData>` to be received by ``Node``. Returns: :py:class:`~app.domain.helpers.enums.HttpCodes`: If upon integrity verification the ``sha256`` hashvalue differs from the expected, the worker replies with a BAD_REQUEST. If the ``Node`` already owns a replica with the same :py:attr:`identifier <app.domain.helpers.smart_dataclasses.FileBlockData.id>` it replies with NOT_ACCEPTABLE. Otherwise it replies with a OK, i.e., the delivery is successful. """ if replica.name not in self.files: # init dict that accepts <key: id, value: sfp> pairs for the file self.files[replica.name] = {} if crypto.sha256(replica.data) != replica.sha256: # inform sender that his part is corrupt, # don't initiate recovery protocol - avoid DoS at current worker. return e.HttpCodes.BAD_REQUEST elif replica.number in self.files[replica.name]: # reject repeated blocks even if they are correct return e.HttpCodes.NOT_ACCEPTABLE else: # accepted file part self.files[replica.name][replica.number] = replica return e.HttpCodes.OK
[docs] def replicate_part( self, cluster: th.ClusterType, replica: sd.FileBlockData) -> None: """Attempts to restore the replication level of the specified file block replica. Similar to :py:meth:`send_part` but with slightly different instructions. In particular new ``replicas`` can not be corrupted at the current node, at the current epoch. Note: There are no guarantees that :py:const:`~app.environment_settings.REPLICATION_LEVEL` will be completely restored during the execution of this method. Args: cluster (:py:class:`~app.type_hints.ClusterType`): A reference to the :py:class:`~app.domain.cluster_groups.Cluster` that will deliver the new ``replica``. replica (:py:class:`~app.domain.helpers.smart_dataclasses.FileBlockData`): The :py:class:`file block replica <app.domain.helpers.smart_dataclasses.FileBlockData>` to be delivered. Raises: NotImplementedError: When children of ``Node`` do not implement the abstract method. """ raise NotImplementedError("")
[docs] def send_part(self, cluster: th.ClusterType, destination: str, replica: sd.FileBlockData) -> th.HttpResponse: """Attempts to send a replica to some other network node. Args: cluster (:py:class:`~app.type_hints.ClusterType`): A reference to the :py:class:`~app.domain.cluster_groups.Cluster` that will deliver the new ``replica``. In a real world implementation this argument would not make sense, but we use it to facilitate simulation management and environment logging. destination: The name, address or another unique identifier of the node that will receive the file block `replica`. replica (:py:class:`~app.domain.helpers.smart_dataclasses.FileBlockData`): The file block container to be sent to some other worker. Returns: :py:class:`~app.domain.helpers.enums.HttpCodes`: An http code. """ return cluster.route_part(self.id, destination, replica)
[docs] def discard_part(self, fid: str, number: int, corrupt: bool = False, cluster: th.ClusterType = None) -> None: """Safely deletes a part from the SGNode instance's disk. Args: fid: Name of the file the file block replica belongs to. number: The part number that uniquely identifies the file block. corrupt: If discard is being invoked due to identified file block corruption, e.g., Sha256 does not match the expected. cluster (:py:class:`~app.type_hints.ClusterType`): :py:class:`~app.domain.cluster_groups.Cluster` that will :py:meth:`set the replication epoch <app.domain.cluster_groups.Cluster.set_replication_epoch>` or mark the simulation as failed. """ replica: sd.FileBlockData = self.files.get(fid, {}).pop(number, None) if replica and corrupt: if replica.decrement_and_get_references() > 0: cluster.set_replication_epoch(replica) else: cluster._set_fail(f"Lost last file block replica with id " f"{replica.id} due to corruption.")
# endregion # region Node API
[docs] def get_file_parts(self, fid: str) -> th.ReplicasDict: """Gets collection of file parts that correspond to the named file. Args: fid: The :py:attr:`file name identifier <app.domain.helpers.smart_dataclasses.FileData.name>` that designates the :py:class:`file block replicas <app.domain.helpers.smart_dataclasses.FileBlockData>` to be retrieved. Returns: :py:class:`~app.type_hints.ReplicasDict`: A dictionary where keys are :py:attr:`file block numbers <app.domain.helpers.smart_dataclasses.FileBlockData.number>` and values are :py:class:`file block replicas <app.domain.helpers.smart_dataclasses.FileBlockData>` """ return self.files.get(fid, {})
[docs] def get_file_parts_count(self, fid: str) -> int: """Counts the number of file block replicas of a specific file owned by the ``Node``. Args: fid: The :py:attr:`file name identifier <app.domain.helpers.smart_dataclasses.FileData.name>` that designates the :py:class:`file block replicas <app.domain.helpers.smart_dataclasses.FileBlockData>` to be counted. Returns: The number of counted replicas. """ return len(self.files.get(fid, {}))
[docs] def is_up(self) -> bool: """Returns ``True`` if the node is online, else ``False``.""" return self.status == e.Status.ONLINE
[docs] def is_suspect(self) -> bool: """Returns ``True`` if the node is behaving suspiciously, else ``False``.""" return self.status == e.Status.SUSPECT
# endregion # region Dunder methods' overrides def __hash__(self): return hash(str(self.id)) def __eq__(self, other): return self.id == other def __ne__(self, other): return not(self == other)
# endregion
[docs]class SGNode(Node): """Represents a network node that executes a Swarm Guidance algorithm. Attributes: clusters: A collection of :py:class:`cluster groups <app.domain.cluster_groups.SGCluster>` the ``SGNode`` is a member of. routing_table (Dict[str, :py:class:`~pd:pandas.DataFrame`]): Contains the information required to appropriately route file block blocks to other SGNode instances. """
[docs] def __init__(self, uid: str, uptime: float) -> None: super().__init__(uid, uptime) self.clusters: Dict[str, th.ClusterType] = {} self.routing_table: Dict[str, pd.DataFrame] = {}
# region Simulation steps
[docs] def execute_epoch(self, cluster: th.ClusterType, fid: str) -> None: """Instructs the ``Node`` instance to execute the epoch. The method iterates all file block blocks in :py:attr:`files` and independently decides if they should be sent to another ``SGNode`` by following the probabilities in :py:attr:`routing_table` column vectors. Overrides: :py:meth:`app.domain.network_nodes.Node.execute_epoch`. Args: cluster (:py:class:`~app.type_hints.ClusterType`): A reference to the :py:class:`~app.domain.cluster_groups.Cluster` that invoked the ``Node`` method. fid: The :py:attr:`file name identifier <app.domain.helpers.smart_dataclasses.FileData.name>` of the file being simulated. """ file_view: th.ReplicasDict = self.files.get(fid, {}).copy() for number, replica in file_view.items(): self.replicate_part(cluster, replica) destination = self.select_destination(replica.name) response_code = self.send_part(cluster, destination, replica) if response_code == e.HttpCodes.OK: self.discard_part(fid, number) elif response_code == e.HttpCodes.BAD_REQUEST: self.discard_part(fid, number, corrupt=True, cluster=cluster) elif response_code in self.suspicious_replies: cluster.complain(self.id, destination, response_code)
# Else keep file part for at least one more epoch # endregion # region File block management
[docs] def replicate_part( self, cluster: th.ClusterType, replica: sd.FileBlockData) -> None: """Attempts to restore the replication level of the specified file block replica. Similar to :py:meth:`~Node.send_part` but with slightly different instructions. In particular new ``replicas`` can not be corrupted at the current node, at the current epoch. The replicas are also sent selectively in descending order to the most reliable Nodes in the ``Cluster`` down to the least reliable. Whereas :py:meth:`send_part`. follows stochastic swarm guidance routing. Overrides: :py:meth:`app.domain.network_nodes.Node.replicate_part`. Note: There are no guarantees that :py:const:`~app.environment_settings.REPLICATION_LEVEL` will be completely restored during the execution of this method. Args: cluster (:py:class:`~app.type_hints.ClusterType`): A reference to the :py:class:`~app.domain.cluster_groups.Cluster` that will deliver the new ``replica``. replica (:py:class:`~app.domain.helpers.smart_dataclasses.FileBlockData`): The :py:class:`file block replica <app.domain.helpers.smart_dataclasses.FileBlockData>` to be delivered. """ # Number of times the block needs to be replicated. lost_replicas: int = replica.can_replicate(cluster.current_epoch) if lost_replicas > 0: sorted_members = list( cluster.v_.sort_values(0, ascending=False).index ) for destination in sorted_members: if lost_replicas == 0: break code = cluster.route_part( self.id, destination, replica, is_fresh=True) if code == e.HttpCodes.OK: lost_replicas -= 1 replica.references += 1 elif code in self.suspicious_replies: cluster.complain(self.id, destination, code) # replication level may have not been completely restored replica.update_epochs_to_recover(cluster.current_epoch)
# endregion # region Routing table management # noinspection PyIncorrectDocstring
[docs] def set_file_routing( self, fid: str, v_: Union[pd.Series, pd.DataFrame] ) -> None: """Maps a file name identifier with a transition column vector used for file block replica routing. Args: fid: The :py:attr:`file name identifier <app.domain.helpers.smart_dataclasses.FileData.name>` of the file whose routing is being configured. `v_` (Union[:py:class:`~pd:pandas.Series`, :py:class:`~pd:pandas.DataFrame`]): A column vector with probabilities that dictate the odds of sending file block blocks belonging to the file with specified id to other Cluster members also working on the persistence of the file block blocks. Raises: ValueError: If ``transition_vector`` is not a :py:class:`~pd:pandas.DataFrame` and cannot be casted to it. """ if isinstance(v_, pd.Series): self.routing_table[fid] = v_.to_frame() elif isinstance(v_, pd.DataFrame): self.routing_table[fid] = v_ else: raise ValueError("set_file_routing method expects a pandas.Series ", "or pandas.DataFrame as transition vector type.")
[docs] def remove_file_routing(self, fid: str) -> None: """Removes a file name from the ``SGNode`` routing table. This method is called when a ``SGNode`` is evicted from the :py:class:`cluster group <app.domain.cluster_groups.SGCluster>` and results in the deletion from disk of all :py:class:`file block replicas <app.domain.helpers.smart_dataclasses.FileBlockData>` with identifier ``fid``. Args: fid: The :py:attr:`file name identifier <app.domain.helpers.smart_dataclasses.FileData.name>` of the file whose routing is being eliminated. """ self.routing_table.pop(fid, pd.DataFrame()) self.files.pop(fid, {})
# endregion # region Swarm guidance
[docs] def select_destination(self, fid: str) -> str: """Selects a random message destination according to `routing_table` probabilities for the specified file name. Args: fid: The :py:attr:`file name identifier <app.domain.helpers.smart_dataclasses.FileData.name>` to obtain the proper :py:attr:`routing_table` for destination selection. Returns: The name or address of the selected destination. """ routing_vector: pd.DataFrame = self.routing_table[fid] hive_members = list(routing_vector.index) member_chances = list(routing_vector.iloc[:, 0]) try: return np.random.choice(a=hive_members, p=member_chances).item() except ValueError as vE: print(f"{routing_vector}\nStochastic?: {np.sum(member_chances)}") sys.exit("".join( traceback.format_exception( etype=type(vE), value=vE, tb=vE.__traceback__)))
# endregion
[docs]class SGNodeExt(SGNode): """Represents a network node that executes a Swarm Guidance algorithm. ``SGNodeExt`` instances differ from :py:class:`SGNode` in the sense that the latter does not monitor the peers belonging to his :py:class:`cluster groups <app.domain.cluster_groups.SGClusterExt>`, concerning their connectivity :py:attr:`~Node.status` or suspicious behaviours. """ # region Simulation steps
[docs] def update_status(self) -> int: """Used to update the time to live of the node instance. When invoked, the network node decides if it should remain online or change some other state. Overrides: :py:meth:`app.domain.network_nodes.Node.update_status`. Returns: :py:class:`~app.domain.helpers.enums.Status`: The the status of the ``Node``. """ if self.is_up(): self.uptime -= 1 if self.uptime <= 0: print(f" [x] {self.id} now offline (suspect status).") self.status = e.Status.SUSPECT return self.status
# endregion
[docs]class HDFSNode(Node): """Represents a data node in the Hadoop Distribute File System.""" # region Simulation steps
[docs] def update_status(self) -> int: """Used to update the time to live of the node instance. When invoked, the network node decides if it should remain online or change some other state. Overrides: :py:meth:`app.domain.network_nodes.Node.update_status`. Returns: :py:class:`~app.domain.helpers.enums.Status`: The the status of the ``Node``. """ if self.is_up(): self.uptime -= 1 if self.uptime <= 0: print(f" [x] {self.id} now offline (suspect status).") self.status = e.Status.SUSPECT return self.status
[docs] def execute_epoch(self, cluster: th.ClusterType, fid: str) -> None: """Instructs the ``HDFSNode`` instance to execute the epoch. The method iterates :py:attr:`files` held in disk and attempts to corrupt them silently. In HDFS file blocks' ``sha256`` are only verified when a user or client accesses the remote replica. Hence, no replication epoch is set up when a corruption occurs. The corruption is still logged in the output file. Overrides: :py:meth:`app.domain.network_nodes.Node.execute_epoch`. Args: cluster (:py:class:`~app.type_hints.ClusterType`): A reference to the :py:class:`~app.domain.cluster_groups.Cluster` that invoked the ``Node`` method. fid: The :py:attr:`file name identifier <app.domain.helpers.smart_dataclasses.FileData.name>` of the file being simulated. """ file_view: th.ReplicasDict = self.files.get(fid, {}).copy() for number, replica in file_view.items(): self.replicate_part(cluster, replica) if np.random.choice(a=es.TRUE_FALSE, p=cluster.corruption_chances): # Don't set corrupt flag to ``True``, doing so causes # set_recovery_epoch to be called. HDFS Corruption is silent. self.discard_part(fid, number) # Log the corruption in output file. epoch = cluster.current_epoch cluster.file.logger.log_corrupted_file_blocks(1, epoch)
# endregion # region File block management
[docs] def replicate_part( self, cluster: th.ClusterType, replica: sd.FileBlockData) -> None: """Attempts to restore the replication level of the specified file block replica. Replicas are sent selectively in descending order to the most reliable Nodes in the ``Cluster`` down to the least reliable. Overrides: :py:meth:`app.domain.network_nodes.Node.replicate_part`. Note: There are no guarantees that :py:const:`~app.environment_settings.REPLICATION_LEVEL` will be completely restored during the execution of this method. Args: cluster (:py:class:`~app.type_hints.ClusterType`): A reference to the :py:class:`~app.domain.cluster_groups.Cluster` that will deliver the new ``replica``. replica (:py:class:`~app.domain.helpers.smart_dataclasses.FileBlockData`): The :py:class:`file block replica <app.domain.helpers.smart_dataclasses.FileBlockData>` to be delivered. """ # Number of times the block needs to be replicated. lost_replicas: int = replica.can_replicate(cluster.current_epoch) if lost_replicas > 0: choices = list(cluster.members.values()) choices.sort(key=lambda node: node.uptime, reverse=True) for destination in choices: if lost_replicas == 0: break code = cluster.route_part( self.id, destination, replica, is_fresh=True) if code == e.HttpCodes.OK: lost_replicas -= 1 replica.references += 1 # replication level may have not been completely restored replica.update_epochs_to_recover(cluster.current_epoch)
# endregion
[docs]class NewscastNode(Node): """Represents a Peer running Newscast protocol, using shuffling techniques to exchange acquaintances with other network peers and performing peer degree aggregation using AverageFunction. Attributes: view: A partial view of the P2P network. ``View`` is a collection of :py:class:`network nodes <app.domain.network_nodes.NewscastNode>`, the ``NewscastNode`` instance may contact other than himself. Keys are :py:class:`NewscastNode` instances, and values are their age in the dictionary. A key-value pair is commonly referenced as a ``descriptor``. aggregation_value: Stores the aggregation value. The type of ``aggregation_value`` is defined by the body of the :py:meth:`aggregate` method. """
[docs] def __init__(self, uid: str, uptime: float) -> None: super().__init__(uid, uptime) self.view: _NetworkView = {} self.aggregation_value: Any = 0
# region Node API
[docs] def add_neighbor(self, node: NewscastNode) -> bool: """Adds a new network node to the node instance's view. If the view is full, the eldest node is replaced with the new node. Otherwise, the new :py:class:`NewscastNode` is added to the instance's view with age zero, unless the entry is already in :py:attr:`view` or the ``node`` is the current ``NewscastNode`` instance. Returns: ``True`` if ``node`` was successfuly added, ``False`` otherwise. """ if node.id == self.id or node in self.view: return False view_size = len(self.view) if view_size < es.NEWSCAST_CACHE_SIZE: self.view[node] = 0 return True if view_size == es.NEWSCAST_CACHE_SIZE: k = tuple(self.view) v = tuple(self.view.values()) oldest_node = k[v.index(max(v))] self.view.pop(oldest_node) self.view[node] = 0 return True return False
[docs] def get_degree(self) -> int: """Counts the number of descriptors in the node's view. Returns: The degree of the ``NewscastNode`` instance. """ return len(self.view)
[docs] def get_node(self) -> Optional[NewscastNode]: """Gets a random node from the current network view. Each candidate :py:class:`NewscastNode` to be returned is first pinged, if no answer is obtained, another node is selected as a candidate by iterating a list representation of :py:attr:`view` and the previous candidate is removed from the :py:attr:`view`. Note: Newscast should always return a random node, thus iteration should not be used, but this search is more efficient and readable. Returns: The selected ``NewscastNode``. """ if self.get_degree() == 0: return None neighbors = tuple(self.view) i = np.random.randint(0, len(neighbors)) candidate_node = neighbors[i] if candidate_node.is_up(): return candidate_node for candidate_node in neighbors: if candidate_node.is_up(): return candidate_node self.view.pop(candidate_node, None) return None
# endregion # region Simulation steps
[docs] def update_status(self) -> int: """Used to update the time to live of the node instance. When invoked, the network node decides if it should remain online or change some other state. Overrides: :py:meth:`app.domain.network_nodes.Node.update_status`. Returns: :py:class:`~app.domain.helpers.enums.Status`: The the status of the ``Node``. """ if self.is_up(): self.uptime -= 1 if self.uptime <= 0: # print(f" [x] {self.id} now has offline or suspicious status.") self.view = {} self.status = e.Status.SUSPECT elif random.uniform(0, 1) < 0.16: # Peer comes online with 16% chance per epoch after going offline. ttl = math.floor(random.uniform(0.04, 0.32) * ms.Master.MAX_EPOCHS) self.uptime = ttl self.status = e.Status.ONLINE # print(f" [o] {self.id} is now back online.") return self.status
[docs] def execute_epoch(self, cluster: th.ClusterType, fid: str) -> None: """Instructs the ``NewscastNode`` instance to execute the epoch. During the execution of the epoch, the ``NewscastNode`` instance randomly selects another ``NewscastNode`` who belongs to his :py:attr:`view` and aggregates their degree using the Average Function. Sometimes, during the epoch, the ``NewscastNode`` instance will also perform shuffling with the selected target. Overrides: :py:meth:`app.domain.network_nodes.Node.execute_epoch`. Args: cluster (:py:class:`~app.type_hints.ClusterType`): A reference to the :py:class:`~app.domain.cluster_groups.Cluster` that invoked the ``Node`` method. fid: The :py:attr:`file name identifier <app.domain.helpers.smart_dataclasses.FileData.name>` of the file being simulated. """ # increase all descriptors' age for k in self.view: self.view[k] += 1 # select a random node from view, the current node is isolated a # query a central server to find a gateway to the overaly. node = self.get_node() or cluster.get_node() if node is None or not node.is_up(): return # use newscast behavior to shuffle views and aggregate peer degrees self.shuffle(node) self.aggregate(node) cluster.log_aggregation(self.aggregation_value)
# endregion # region File block management
[docs] def replicate_part( self, cluster: th.ClusterType, replica: sd.FileBlockData ) -> None: pass
# endregion # region Newscast peer shuffling and aggregation
[docs] def aggregate(self, node: NewscastNode = None) -> None: """The network node instance contacts another node from his view, then, both nodes assign the mean of their degrees to :py:attr:`aggregation_value`. Args: node: When ``node`` is None a random ``NewscastNode`` is selected from :py:attr:`view`. When specified to be contacted is the one referenced in the parameter. """ candidate_node = node or self.get_node() if candidate_node is None: return mean = (self.get_degree() + candidate_node.get_degree()) / 2 candidate_node.aggregation_value = mean self.aggregation_value = mean
[docs] def shuffle(self, node: NewscastNode) -> None: """Starts a shuffle process that merges and crops two nodes' views at the current node and at the destination node. The final view consists of most up to date descriptors from both :py:attr:`views <view>` up to a maximum of :py:attr:`max_view_size` descriptors. Args: node: The node to be contacted for shuffling. """ my_view = dict(self.view) my_view[self] = 0 his_view = node.shuffle_request(my_view) buffer = self._merge(self.view, his_view) self.view = self._select_view(buffer)
[docs] def shuffle_request(self, senders_view: _NetworkView) -> _NetworkView: """Merges and crops two nodes' views at the current node. The final view consists of most up to date descriptors from both :py:attr:`views <view>` up to a maximum of :py:attr:`max_view_size` descriptors. Args: senders_view: A dictionary where keys are :py:class:`network nodes <Node>` and values are their respective age in the view. Returns: A :py:attr:`view` and a fresh ``descriptor`` from the ``NewscastNode`` instance, before it is merged with the requestor's view. """ my_view = dict(self.view) my_view[self] = 0 buffer = self._merge(self.view, senders_view) self.view = self._select_view(buffer) return my_view
[docs] def _merge(self, a: _NetworkView, b: _NetworkView) -> _NetworkView: """Merges two network views. If a node descriptor exists in both views, the most recent descriptor is kept. Args: a: A dictionary where keys are :py:class:`network nodes <Node>` and values are their respective age in the view. b: A dictionary where keys are :py:class:`network nodes <Node>` and values are their respective age in the view. Returns: The set union of both views with only the most up to date descriptors. """ for k in b: a[k] = min(a[k], b[k]) if k in a else b[k] return a
[docs] def _select_view(self, view_buffer: _NetworkView) -> _NetworkView: """Reduces the size of the view to a predefined maximum size. Args: A dictionary where keys are :py:class:`network nodes <Node>` and values are their respective age in the view. Returns: The ``view_buffer`` with at most :py:attr:`max_view_size` descriptors. """ view_buffer = sorted(view_buffer.items(), key=lambda x: x[1]) view_buffer = view_buffer[:es.NEWSCAST_CACHE_SIZE] return dict(view_buffer)
# endregion