Source code for ncdiff.calculator

import logging
from functools import lru_cache

from ncclient import xml_

from .ref import IdentityRef, InstanceIdentifier
from .errors import ConfigError

# create a logger for this module
logger = logging.getLogger(__name__)

nc_url = xml_.BASE_NS_1_0
yang_url = 'urn:ietf:params:xml:ns:yang:1'
operation_tag = '{' + nc_url + '}operation'
insert_tag = '{' + yang_url + '}insert'
value_tag = '{' + yang_url + '}value'
key_tag = '{' + yang_url + '}key'


[docs]class BaseCalculator(object): '''BaseCalculator A set of methods to help calculate substruction and addition of two Config instances. This is a base class, which is inherited by protocol calculator classes, for example: NetconfCalculator and RestconfCalculator. Attributes ---------- device : `object` An instance of yang.ncdiff.ModelDevice, which represents a modeled device. etree1 : `Element` A lxml Element which contains the config. etree2 : `Element` A lxml Element which contains the other config. ''' def __init__(self, device, etree1, etree2): ''' __init__ instantiates a BaseCalculator instance. ''' self.device = device self.etree1 = etree1 self.etree2 = etree2 self.__attach_per_instance_cache() @staticmethod def _del_attrib(element): '''_del_attrib Low-level api: Delete four attributes from an Element node if they exist: operation, insert, value and key. Parameters ---------- element : `Element` The Element node needs to be looked at. Returns ------- Element The Element node is returned after processing. ''' for ele in element.iter(): for attribute in ele.attrib.keys(): del ele.attrib[attribute] return element @staticmethod def _get_sequence(scope, tag, parent): '''_get_sequence Low-level api: Return a list of children of a parent with the same tag within the scope. Parameters ---------- scope : `list` List members can be an element, or a tuple of two elements. tag : `str` Identifier in `{url}tagname` notation. parent : `Element` The parent node. Returns ------- list A list of children with the same tag within the scope. ''' new_scope = [] for item in scope: if isinstance(item, tuple): one, two = item if one.getparent() == parent: new_scope.append(one) else: new_scope.append(two) else: new_scope.append(item) return [child for child in parent.iterchildren(tag=tag) if child in new_scope] def _pair_children(self, node_one, node_two): """_pair_children pair all children with their peers, resulting in a list of Tuples Parameters ---------- node_one : `Element` An Element node in one Config instance. node_two : `Element` An Element node in the other Config instance. Returns ------- list List of matching pairs, one of both items in the pair can be None """ # Hash based approach, build two hashtables and match # keys are (tag, self-key) # self-key is # text for leaf-list # key tuple for list # none for others def find_one_child(node, tag): """ Find exactly one child with the given tag""" s = list(node.iterchildren(tag=tag)) if len(s) < 1: raise ConfigError("cannot find key '{}' in node {} under {}" \ .format(tag, node.tag, self.device \ .get_xpath(node.getparent()))) if len(s) > 1: raise ConfigError("not unique key '{}' in node {}" \ .format(tag, self.device.get_xpath(node))) return s[0] def build_index_tuple(node, keys, s_node): """ build a tuple containing the text of all fields listed in keys, taken from node s_node is passed to prevent it being looked up twice. """ out = [self._parse_text(find_one_child(node, k), s_node) for k in keys] return tuple(out) type_for_tag = {} def get_type_for_tag(tag, child): """ Given a child node with a given tag, find the type caches based on tag :return: a tuple, containing the s_node and node type """ node_type = type_for_tag.get(tag, None) if node_type is not None: return node_type s_node = self.device.get_schema_node(child) node_type = s_node.get('type') result = (s_node, node_type) type_for_tag[tag] = result return result def build_unique_id(child): """ Build the hash key for a node """ tag = child.tag key = None s_node, node_type = get_type_for_tag(tag, child) if node_type == 'leaf-list': key = self._parse_text(child, s_node) elif node_type == 'list': keys = self._get_list_keys(s_node) key = build_index_tuple(child, keys, s_node) return (tag, key) # build the hashmap for node_one ones = {} for child in node_one.getchildren(): key = build_unique_id(child) if key in ones: raise ConfigError('not unique peer of node {} {}' \ .format(child, ones[key])) ones[key] = child # build the hashmap for node_two twos = {} for child in node_two.getchildren(): key = build_unique_id(child) if key in twos: raise ConfigError('not unique peer of node {} {}' \ .format(child, twos[key])) twos[key] = child # make pairs, in order one_lookup = set(ones.keys()) keys_in_order = list(ones.keys()) keys_in_order.extend([two for two in twos.keys() if two not in one_lookup]) return [(ones.get(uid, None), twos.get(uid, None)) for uid in keys_in_order] def _group_kids(self, node_one, node_two): '''_group_kids Low-level api: Consider an Element node in a Config instance. Now we have two Config instances and we want to compare two corresponding Element nodes. This method group children of these nodes in three categories: some only exist in node_one, some only exist in node_two, and some chiildren of node_one have peers in node_two. Parameters ---------- node_one : `Element` An Element node in one Config instance. node_two : `Element` An Element node in the other Config instance. Returns ------- tuple There are three elements in the tuple. The first element is a list of children of node_one that do not have any peers in node_two. The second element is a list of children of node_two that do not have any peers in node_one. The last element is a list of tuples, and each tuple represents a pair of peers. ''' in_1_not_in_2 = [] in_2_not_in_1 = [] in_1_and_in_2 = [] for one, two in self._pair_children(node_one, node_two): if one is None: in_2_not_in_1.append(two) elif two is None: in_1_not_in_2.append(one) else: in_1_and_in_2.append((one, two)) return (in_1_not_in_2, in_2_not_in_1, in_1_and_in_2) @staticmethod def _get_list_keys(schema_node): '''_get_list_keys Low-level api: Given a schema node, in particular, a list type schema node, it returns a list of keys. Parameters ---------- schema_node : `Element` A schema node. Returns ------- list A list of tags of keys in `{url}tagname` notation. ''' nodes = list(filter(lambda x: x.get('is_key'), schema_node.getchildren())) return sorted([n.tag for n in nodes]) def _get_peers(self, child_self, parent_other): '''_get_peers Low-level api: Given a config node, find peers under a parent node. Parameters ---------- child_self : `Element` An Element node on this side. parent_other : `Element` An Element node on the other side. Returns ------- list A list of children of parent_other who are peers of child_self. ''' peers = parent_other.findall(child_self.tag) s_node = self.device.get_schema_node(child_self) if s_node.get('type') == 'leaf-list': return list(filter(lambda x: self._same_text(child_self, x), peers)) elif s_node.get('type') == 'list': keys = self._get_list_keys(s_node) return list(filter(lambda x: self._is_peer(keys, child_self, x), peers)) else: return peers def _is_peer(self, keys, node_self, node_other): '''_is_peer Low-level api: Return True if node_self and node_other are considered as peer with regards to a set of keys. Parameters ---------- keys : `list` A list of keys in `{url}tagname` notation. node_self : `Element` An Element node on this side. node_other : `Element` An Element node on the other side. Returns ------- list True if node_self is a peer of node_other, otherwise, return False. ''' for key in keys: s = list(node_self.iterchildren(tag=key)) o = list(node_other.iterchildren(tag=key)) if len(s) < 1 or len(o) < 1: raise ConfigError("cannot find key '{}' in node {}" \ .format(key, self.device.get_xpath(node_self))) if len(s) > 1 or len(o) > 1: raise ConfigError("not unique key '{}' in node {}" \ .format(key, self.device.get_xpath(node_self))) if not self._same_text(s[0], o[0]): return False return True def __attach_per_instance_cache(self): """ We want to cache _parse_text, as it is an expensive call Python functools conveniently provides the @lru_cache annotation (and no other way of having a pre-made lru_cache) The lru_cache annotation however locks all arguments in memory, including self This causes both trees for this calculator to be locked in memory We want to use the @lru_cache annotation, but release the cache when this calculator is garbage collected For this, we use the following construction: https://stackoverflow.com/questions/14946264/python-lru-cache-decorator-per-instance """ self._parse_text = lru_cache(maxsize=128)(self._parse_text) # cache because this is an expensive call, often called multiple times on the same node in rapid succession def _parse_text(self, node, schema_node=None): '''_parse_text Low-level api: Return text if a node. Pharsing is required if the node data type is identityref or instance-identifier. Parameters ---------- node : `Element` An Element node in data tree. Returns ------- None or str None if the node does not have text, otherwise, text string of the node. ''' if node.text is None: return None if schema_node is None: schema_node = self.device.get_schema_node(node) if schema_node.get('datatype') is not None and \ (schema_node.get('datatype')[:11] == 'identityref'): idref = IdentityRef(self.device, node) return idref.default elif schema_node.get('datatype') is not None and \ schema_node.get('datatype') == 'instance-identifier': instanceid = InstanceIdentifier(self.device, node) return instanceid.default else: if schema_node.get("type") == "container": # prevent whitespace in container to cause problems return None return node.text def _same_text(self, node1, node2): '''_same_text Low-level api: Compare text values of two nodes. Parameters ---------- node1 : `Element` An Element node in data tree. node2 : `Element` An Element node in data tree. Returns ------- bool True if text values of two nodes are same, otherwise, False. ''' if node1.text is None and node2.text is None: return True return self._parse_text(node1) == self._parse_text(node2) def _merge_text(self, from_node, to_node): '''_merge_text Low-level api: Set text value of to_node according to the text value of from_node. Parameters ---------- from_node : `Element` An Element node in data tree. to_node : `Element` An Element node in data tree. Returns ------- None There is no return of this method. ''' if from_node.text is None: to_node.text = None return schema_node = self.device.get_schema_node(from_node) if schema_node.get('datatype') is not None and \ schema_node.get('datatype')[:11] == 'identityref': idref = IdentityRef(self.device, from_node, to_node=to_node) to_node.text = idref.converted elif schema_node.get('datatype') is not None and \ schema_node.get('datatype') == 'instance-identifier': instanceid = InstanceIdentifier(self.device, from_node, to_node=to_node) to_node.text = instanceid.converted else: to_node.text = from_node.text @property def le(self): return self._node_le(self.etree1, self.etree2) @property def lt(self): return self._node_le(self.etree1, self.etree2) and \ not self._node_le(self.etree2, self.etree1) @property def ge(self): return self._node_le(self.etree2, self.etree1) @property def gt(self): return self._node_le(self.etree2, self.etree1) and \ not self._node_le(self.etree1, self.etree2) @property def eq(self): return self._node_le(self.etree1, self.etree2) and \ self._node_le(self.etree2, self.etree1) @property def ne(self): return not self._node_le(self.etree1, self.etree2) or \ not self._node_le(self.etree2, self.etree1) def _node_le(self, node_self, node_other): '''_node_le Low-level api: Return True if all descendants of one node exist in the other node. Otherwise False. This is a recursive method. Parameters ---------- node_self : `Element` A node to be compared. node_other : `Element` Another node to be compared. Returns ------- bool True if all descendants of node_self exist in node_other, otherwise False. ''' for x in ['tag', 'tail']: if node_self.__getattribute__(x) != node_other.__getattribute__(x): return False if not self._same_text(node_self, node_other): return False for a in node_self.attrib: if a not in node_other.attrib or \ node_self.attrib[a] != node_other.attrib[a]: return False for child, child_other in self._pair_children(node_self, node_other): if child is None: # only in other, meaningless continue if child_other is None: # only in other, false return False # both are present schma_node = self.device.get_schema_node(child) ordered_by = schma_node.get('ordered-by') child_type = schma_node.get('type') if ordered_by == 'user' and ( child_type == 'leaf-list' or child_type == 'list'): elder_siblings = list(child.itersiblings(tag=child.tag, preceding=True)) if elder_siblings: immediate_elder_sibling = elder_siblings[0] peers_of_immediate_elder_sibling = \ self._get_peers(immediate_elder_sibling, node_other) if len(peers_of_immediate_elder_sibling) < 1: return False elif len(peers_of_immediate_elder_sibling) > 1: p = self.device.get_xpath(immediate_elder_sibling) raise ConfigError('not unique peer of node {}' \ .format(p)) elder_siblings_of_peer = \ list(child_other.itersiblings(tag=child.tag, preceding=True)) if peers_of_immediate_elder_sibling[0] not in \ elder_siblings_of_peer: return False if not self._node_le(child, child_other): return False return True