Source code for ncdiff.netconf

import re
import logging
from lxml import etree
from copy import deepcopy
from ncclient import operations, xml_

from .errors import ConfigDeltaError, ModelError
from .calculator import BaseCalculator

# create a logger for this module
logger = logging.getLogger(__name__)

nc_url = xml_.BASE_NS_1_0
yang_url = 'urn:ietf:params:xml:ns:yang:1'
config_tag = '{' + nc_url + '}config'
filter_tag = '{' + nc_url + '}filter'
operation_tag = '{' + nc_url + '}operation'
insert_tag = '{' + yang_url + '}insert'
value_tag = '{' + yang_url + '}value'
key_tag = '{' + yang_url + '}key'


def _inserterror(direction, path, attr_name, attr_value=None):
    if attr_value:
        raise ConfigDeltaError("attribute wrong: try to insert the node "
                               "{} {} another node, which cannot be found "
                               "by attribute '{}={}'"
                               .format(path, direction, attr_name, attr_value))
    else:
        raise ConfigDeltaError("attribute missing: try to insert the node "
                               "{} {} another node, but it does not have "
                               "attribute '{}'"
                               .format(path, direction, attr_name))


def _copy_element(new_parent, element):
    new_child = etree.SubElement(new_parent, element.tag,
                                 attrib=element.attrib,
                                 nsmap=element.nsmap)
    if element.text is not None:
        new_child.text = element.text
    for subelement in element:
        _copy_element(new_child, subelement)


def _copy_elements(new_parent, elements):
    for element in elements:
        _copy_element(new_parent, element)


[docs]class NetconfParser(object): '''NetconfParser A parser to convert a reply of get-config to an lxml Element object. Attributes ---------- ele : `Element` An lxml Element object which is the root of the config tree. ''' def __init__(self, device, get_reply): self.device = device self.reply = get_reply self._ele = None @property def ele(self): if self._ele is None: if isinstance(self.reply, str): parser = etree.XMLParser(remove_blank_text=True) self._ele = self.retrieve_config(etree.XML(self.reply, parser)) elif etree.iselement(self.reply): self._ele = self.retrieve_config(self.reply) elif isinstance(self.reply, operations.rpc.RPCReply): self._ele = self.retrieve_config(self.reply._root) return self._ele
[docs] @staticmethod def retrieve_config(element): '''retrieve_config High-level api: Retrive config from rpc-reply. Parameters ---------- element : `Element` A rpc-reply or content of Config instance. Returns ------- Element A new element which represents config data. ''' if not etree.iselement(element): raise TypeError("argument 'element' must be Element not '{}'" .format(type(element))) ret = etree.Element(config_tag, nsmap={'nc': nc_url}) _copy_elements(ret, element.xpath('/nc:rpc-reply/nc:data/*', namespaces={'nc': nc_url})) _copy_elements(ret, element.xpath('/nc:data/*', namespaces={'nc': nc_url})) _copy_elements(ret, element.xpath('/nc:config/*', namespaces={'nc': nc_url})) _copy_elements(ret, element.xpath('/nc:rpc/nc:edit-config/nc:config/*', namespaces={'nc': nc_url})) _copy_elements(ret, element.xpath('/nc:edit-config/nc:config/*', namespaces={'nc': nc_url})) return ret
[docs]class NetconfCalculator(BaseCalculator): '''NetconfCalculator A Netconf calculator to do subtraction and addition. A subtraction is to compute the delta between two Config instances in a form of Netconf edit-config. An addition is to apply one Netconf edit-config to a Config instance. Attributes ---------- sub : `Element` Content of a Netconf edit-config which can achieve a transition from one config, i.e., self.etree2, to another config, i.e., self.etree1. add : `Element` Content of a Config instance. preferred_create : `str` Preferred operation of creating a new element. Choice of 'merge', 'create' or 'replace'. preferred_replace : `str` Preferred operation of replacing an existing element. Choice of 'merge' or 'replace'. preferred_delete : `str` Preferred operation of deleting an existing element. Choice of 'delete' or 'remove'. ''' def __init__(self, device, etree1, etree2, preferred_create='merge', preferred_replace='merge', preferred_delete='delete'): ''' __init__ instantiates a NetconfCalculator instance. ''' BaseCalculator.__init__(self, device, etree1, etree2) if preferred_create in ['merge', 'create', 'replace']: self.preferred_create = preferred_create else: raise ValueError("only 'merge', 'create' or 'replace' are valid " "values of 'preferred_create'") if preferred_replace in ['merge', 'replace']: self.preferred_replace = preferred_replace else: raise ValueError("only 'merge' or 'replace' are valid " "values of 'preferred_replace'") if preferred_delete in ['delete', 'remove']: self.preferred_delete = preferred_delete else: raise ValueError("only 'delete' or 'remove' are valid " "values of 'preferred_delete'") @property def add(self): ele1 = deepcopy(self.etree1) ele2 = deepcopy(self.etree2) self.node_add(ele1, ele2) return ele1 @property def sub(self): ele1 = deepcopy(self.etree1) ele2 = deepcopy(self.etree2) self.node_sub(ele1, ele2) return ele1
[docs] def node_add(self, node_sum, node_other): '''node_add High-level api: Combine two configs or apply an instance of ConfigDelta to a config. This method is recursive. node_sum will be modified during the process, and it becomes the result at the end. Parameters ---------- node_sum : `Element` A config node in a config tree. node_other : `Element` A config node in another config tree. Returns ------- None There is no return of this method. ''' supported_node_type = [ 'leaf', 'leaf-list', 'container', 'list', ] in_s_not_in_o, in_o_not_in_s, in_s_and_in_o = \ self._group_kids(node_sum, node_other) for child_self in in_s_not_in_o: pass for child_other in in_o_not_in_s: this_operation = child_other.get(operation_tag, default='merge') # delete if this_operation == 'delete': raise ConfigDeltaError( 'data-missing: try to delete node {} but it does not ' 'exist in config'.format( self.device.get_xpath(child_other) ) ) # remove elif this_operation == 'remove': pass # merge, create, replace or none elif ( this_operation == 'merge' or this_operation == 'replace' or this_operation == 'create' ): s_node = self.device.get_schema_node(child_other) if s_node.get('type') in supported_node_type: getattr( self, '_node_add_without_peer_{}'.format( s_node.get('type').replace('-', '') ) )(node_sum, child_other) else: raise ConfigDeltaError( "unknown operation: node {} contains operation " "'{}'".format( self.device.get_xpath(child_other), this_operation ) ) for child_self, child_other in in_s_and_in_o: s_node = self.device.get_schema_node(child_self) if s_node.get('type') in supported_node_type: getattr( self, '_node_add_with_peer_{}'.format( s_node.get('type').replace('-', '') ) )(child_self, child_other) if not list(child_self): if ( s_node.get('type') == 'container' and s_node.get('presence') != 'true' ): node_sum.remove(child_self)
def _node_add_without_peer_leaf(self, node_sum, child_other): '''_node_add_without_peer_leaf Low-level api: Apply delta child_other to node_sum when there is no peer of child_other can be found under node_sum. child_other is a leaf node. Element node_sum will be modified during the process. Parameters ---------- node_sum : `Element` A config node in a config tree. child_other : `Element` A child of a config node in another config tree. This child has no peer under node_sum. Returns ------- None There is no return of this method. ''' e = deepcopy(child_other) node_sum.append(self._del_attrib(e)) def _node_add_without_peer_leaflist(self, node_sum, child_other): '''_node_add_without_peer_leaflist Low-level api: Apply delta child_other to node_sum when there is no peer of child_other can be found under node_sum. child_other is a leaf-list node. Element node_sum will be modified during the process. Parameters ---------- node_sum : `Element` A config node in a config tree. child_other : `Element` A child of a config node in another config tree. This child has no peer under node_sum. Returns ------- None There is no return of this method. ''' s_node = self.device.get_schema_node(child_other) e = deepcopy(child_other) scope = node_sum.getchildren() siblings = self._get_sequence(scope, child_other.tag, node_sum) if s_node.get('ordered-by') == 'user' and \ child_other.get(insert_tag) is not None: if child_other.get(insert_tag) == 'first': if siblings: siblings[0].addprevious(self._del_attrib(e)) else: node_sum.append(self._del_attrib(e)) elif child_other.get(insert_tag) == 'last': if siblings: siblings[-1].addnext(self._del_attrib(e)) else: node_sum.append(self._del_attrib(e)) elif child_other.get(insert_tag) == 'before': if child_other.get(value_tag) is None: _inserterror('before', self.device.get_xpath(child_other), 'value') siblings = node_sum.findall(child_other.tag) sibling = [s for s in siblings if s.text == child_other.get(value_tag)] if not sibling: path = self.device.get_xpath(child_other) value = child_other.get(value_tag) _inserterror('before', path, 'value', value) sibling[0].addprevious(self._del_attrib(e)) elif child_other.get(insert_tag) == 'after': if child_other.get(value_tag) is None: _inserterror('after', self.device.get_xpath(child_other), 'value') siblings = node_sum.findall(child_other.tag) sibling = [s for s in siblings if s.text == child_other.get(value_tag)] if not sibling: path = self.device.get_xpath(child_other) value = child_other.get(value_tag) _inserterror('after', path, 'value', value) sibling[0].addnext(self._del_attrib(e)) else: if siblings: siblings[-1].addnext(self._del_attrib(e)) else: node_sum.append(self._del_attrib(e)) def _node_add_without_peer_container(self, node_sum, child_other): '''_node_add_without_peer_container Low-level api: Apply delta child_other to node_sum when there is no peer of child_other can be found under node_sum. child_other is a container node. Element node_sum will be modified during the process. Parameters ---------- node_sum : `Element` A config node in a config tree. child_other : `Element` A child of a config node in another config tree. This child has no peer under node_sum. Returns ------- None There is no return of this method. ''' this_operation = child_other.get(operation_tag, default='merge') if this_operation == 'merge': e = etree.SubElement(node_sum, child_other.tag, nsmap=child_other.nsmap) self.node_add(e, child_other) elif ( this_operation == 'replace' or this_operation == 'create' ): node_sum.append(self._del_attrib(deepcopy(child_other))) def _node_add_without_peer_list(self, node_sum, child_other): '''_node_add_without_peer_list Low-level api: Apply delta child_other to node_sum when there is no peer of child_other can be found under node_sum. child_other is a list node. Element node_sum will be modified during the process. Parameters ---------- node_sum : `Element` A config node in a config tree. child_other : `Element` A child of a config node in another config tree. This child has no peer under node_sum. Returns ------- None There is no return of this method. ''' s_node = self.device.get_schema_node(child_other) this_operation = child_other.get(operation_tag, default='merge') if this_operation == 'merge': e = etree.Element(child_other.tag, nsmap=child_other.nsmap) for list_key_tag in self._get_list_keys(s_node): key_ele_other = child_other.find(list_key_tag) key_ele_self = deepcopy(key_ele_other) e.append(self._del_attrib(key_ele_self)) elif ( this_operation == 'replace' or this_operation == 'create' ): e = self._del_attrib(deepcopy(child_other)) scope = node_sum.getchildren() siblings = self._get_sequence(scope, child_other.tag, node_sum) if s_node.get('ordered-by') == 'user' and \ child_other.get(insert_tag) is not None: if child_other.get(insert_tag) == 'first': if siblings: siblings[0].addprevious(e) else: node_sum.append(e) elif child_other.get(insert_tag) == 'last': if siblings: siblings[-1].addnext(e) else: node_sum.append(e) elif child_other.get(insert_tag) == 'before': if child_other.get(key_tag) is None: _inserterror('before', self.device.get_xpath(child_other), 'key') sibling = node_sum.find(child_other.tag + child_other.get(key_tag), namespaces=child_other.nsmap) if sibling is None: path = self.device.get_xpath(child_other) key = child_other.get(key_tag) _inserterror('before', path, 'key', key) sibling.addprevious(e) elif child_other.get(insert_tag) == 'after': if child_other.get(key_tag) is None: _inserterror('after', self.device.get_xpath(child_other), 'key') sibling = node_sum.find(child_other.tag + child_other.get(key_tag), namespaces=child_other.nsmap) if sibling is None: path = self.device.get_xpath(child_other) key = child_other.get(key_tag) _inserterror('after', path, 'key', key) sibling.addnext(e) else: if siblings: siblings[-1].addnext(e) else: node_sum.append(e) if this_operation == 'merge': self.node_add(e, child_other) def _node_add_with_peer_leaf(self, child_self, child_other): '''_node_add_with_peer_leaf Low-level api: Apply delta child_other to child_self when child_self is the peer of child_other. Element child_self and child_other are leaf nodes. Element child_self will be modified during the process. RFC6020 section 7.6.7 is a reference of this method. Parameters ---------- child_self : `Element` A child of a config node in a config tree. child_other : `Element` A child of a config node in another config tree. child_self is the peer of child_other. Returns ------- None There is no return of this method. ''' this_operation = child_other.get(operation_tag, default='merge') if this_operation == 'merge': self._merge_text(child_other, child_self) elif this_operation == 'replace': self._merge_text(child_other, child_self) elif this_operation == 'create': raise ConfigDeltaError('data-exists: try to create node {} but ' 'it already exists' .format(self.device.get_xpath(child_other))) elif ( this_operation == 'delete' or this_operation == 'remove' ): parent_self = child_self.getparent() parent_self.remove(child_self) else: raise ConfigDeltaError("unknown operation: node {} contains " "operation '{}'" .format(self.device.get_xpath(child_other), this_operation)) def _node_add_with_peer_leaflist(self, child_self, child_other): '''_node_add_with_peer_leaflist Low-level api: Apply delta child_other to child_self when child_self is the peer of child_other. Element child_self and child_other are leaf-list nodes. Element child_self will be modified during the process. RFC6020 section 7.7.7 is a reference of this method. Parameters ---------- child_self : `Element` A child of a config node in a config tree. child_other : `Element` A child of a config node in another config tree. child_self is the peer of child_other. Returns ------- None There is no return of this method. ''' parent_self = child_self.getparent() s_node = self.device.get_schema_node(child_self) this_operation = child_other.get(operation_tag, default='merge') if this_operation == 'merge' or \ this_operation == 'replace': if s_node.get('ordered-by') == 'user' and \ child_other.get(insert_tag) is not None: if child_other.get(insert_tag) == 'first': scope = parent_self.getchildren() siblings = self._get_sequence(scope, child_other.tag, parent_self) if siblings[0] != child_self: siblings[0].addprevious(child_self) elif child_other.get(insert_tag) == 'last': scope = parent_self.getchildren() siblings = self._get_sequence(scope, child_other.tag, parent_self) if siblings[-1] != child_self: siblings[-1].addnext(child_self) elif child_other.get(insert_tag) == 'before': if child_other.get(value_tag) is None: _inserterror('before', self.device.get_xpath(child_other), 'value') siblings = parent_self.findall(child_other.tag) sibling = [s for s in siblings if s.text == child_other.get(value_tag)] if not sibling: path = self.device.get_xpath(child_other) value = child_other.get(value_tag) _inserterror('before', path, 'value', value) if sibling[0] != child_self: sibling[0].addprevious(child_self) elif child_other.get(insert_tag) == 'after': if child_other.get(value_tag) is None: _inserterror('after', self.device.get_xpath(child_other), 'value') siblings = parent_self.findall(child_other.tag) sibling = [s for s in siblings if s.text == child_other.get(value_tag)] if not sibling: path = self.device.get_xpath(child_other) value = child_other.get(value_tag) _inserterror('after', path, 'value', value) if sibling[0] != child_self: sibling[0].addnext(child_self) elif this_operation == 'create': raise ConfigDeltaError('data-exists: try to create node {} but ' 'it already exists' .format(self.device.get_xpath(child_other))) elif ( this_operation == 'delete' or this_operation == 'remove' ): parent_self.remove(child_self) else: raise ConfigDeltaError("unknown operation: node {} contains " "operation '{}'" .format(self.device.get_xpath(child_other), this_operation)) def _node_add_with_peer_container(self, child_self, child_other): '''_node_add_with_peer_container Low-level api: Apply delta child_other to child_self when child_self is the peer of child_other. Element child_self and child_other are container nodes. Element child_self will be modified during the process. RFC6020 section 7.5.8 is a reference of this method. Parameters ---------- child_self : `Element` A child of a config node in a config tree. child_other : `Element` A child of a config node in another config tree. child_self is the peer of child_other. Returns ------- None There is no return of this method. ''' parent_self = child_self.getparent() this_operation = child_other.get(operation_tag, default='merge') if this_operation == 'merge': self.node_add(child_self, child_other) elif this_operation == 'replace': parent_self.replace(child_self, self._del_attrib(deepcopy(child_other))) elif this_operation == 'create': raise ConfigDeltaError('data-exists: try to create node {} but ' 'it already exists' .format(self.device.get_xpath(child_other))) elif ( this_operation == 'delete' or this_operation == 'remove' ): parent_self.remove(child_self) else: raise ConfigDeltaError("unknown operation: node {} contains " "operation '{}'" .format(self.device.get_xpath(child_other), this_operation)) def _node_add_with_peer_list(self, child_self, child_other): '''_node_add_with_peer_list Low-level api: Apply delta child_other to child_self when child_self is the peer of child_other. Element child_self and child_other are list nodes. Element child_self will be modified during the process. RFC6020 section 7.8.6 is a reference of this method. Parameters ---------- child_self : `Element` A child of a config node in a config tree. child_other : `Element` A child of a config node in another config tree. child_self is the peer of child_other. Returns ------- None There is no return of this method. ''' parent_self = child_self.getparent() s_node = self.device.get_schema_node(child_self) this_operation = child_other.get(operation_tag, default='merge') if this_operation != 'delete' and \ this_operation != 'remove' and \ s_node.get('ordered-by') == 'user' and \ child_other.get(insert_tag) is not None: if child_other.get(insert_tag) == 'first': scope = parent_self.getchildren() siblings = self._get_sequence(scope, child_other.tag, parent_self) if siblings[0] != child_self: siblings[0].addprevious(child_self) elif child_other.get(insert_tag) == 'last': scope = parent_self.getchildren() siblings = self._get_sequence(scope, child_other.tag, parent_self) if siblings[-1] != child_self: siblings[-1].addnext(child_self) elif child_other.get(insert_tag) == 'before': if child_other.get(key_tag) is None: _inserterror('before', self.device.get_xpath(child_other), 'key') sibling = parent_self.find(child_other.tag + child_other.get(key_tag), namespaces=child_other.nsmap) if sibling is None: path = self.device.get_xpath(child_other) key = child_other.get(key_tag) _inserterror('before', path, 'key', key) if sibling != child_self: sibling.addprevious(child_self) elif child_other.get(insert_tag) == 'after': if child_other.get(key_tag) is None: _inserterror('after', self.device.get_xpath(child_other), 'key') sibling = parent_self.find(child_other.tag + child_other.get(key_tag), namespaces=child_other.nsmap) if sibling is None: path = self.device.get_xpath(child_other) key = child_other.get(key_tag) _inserterror('after', path, 'key', key) if sibling != child_self: sibling.addnext(child_self) if this_operation == 'merge': self.node_add(child_self, child_other) elif this_operation == 'replace': parent_self.replace(child_self, self._del_attrib(deepcopy(child_other))) elif this_operation == 'create': raise ConfigDeltaError('data-exists: try to create node {} but ' 'it already exists' .format(self.device.get_xpath(child_other))) elif ( this_operation == 'delete' or this_operation == 'remove' ): parent_self.remove(child_self) else: raise ConfigDeltaError("unknown operation: node {} contains " "operation '{}'" .format(self.device.get_xpath(child_other), this_operation))
[docs] def node_sub(self, node_self, node_other): '''node_sub Low-level api: Compute the delta of two configs. This method is recursive. Assume two configs are different. Parameters ---------- node_self : `Element` A config node in a config tree that is being processed. node_self cannot be a leaf node. node_other : `Element` A config node in another config tree that is being processed. Returns ------- None There is no return of this method. ''' def same_leaf_list(tag): list_self = [c for c in list(node_self) if c.tag == tag] list_other = [c for c in list(node_other) if c.tag == tag] s_node = self.device.get_schema_node((list_self + list_other)[0]) if s_node.get('ordered-by') == 'user': if [self._parse_text(i, s_node) for i in list_self] == \ [self._parse_text(i, s_node) for i in list_other]: return True else: return False else: if set([self._parse_text(i, s_node) for i in list_self]) == \ set([self._parse_text(i, s_node) for i in list_other]): return True else: return False if self.preferred_replace != 'merge': t_self = [ c.tag for c in list(node_self) if self.device.get_schema_node(c).get('type') == 'leaf-list' ] t_other = [ c.tag for c in list(node_other) if self.device.get_schema_node(c).get('type') == 'leaf-list' ] commonalities = set(t_self) & set(t_other) for commonality in commonalities: if not same_leaf_list(commonality): node_self.set(operation_tag, 'replace') node_other.set(operation_tag, 'replace') return in_s_not_in_o, in_o_not_in_s, in_s_and_in_o = \ self._group_kids(node_self, node_other) ordered_by_user = {} for child_self in in_s_not_in_o: child_other = etree.Element(child_self.tag, {operation_tag: self.preferred_delete}, nsmap=child_self.nsmap) if self.preferred_create != 'merge': child_self.set(operation_tag, self.preferred_create) siblings = list(node_other.iterchildren(tag=child_self.tag)) if siblings: siblings[-1].addnext(child_other) else: node_other.append(child_other) s_node = self.device.get_schema_node(child_self) if s_node.get('type') == 'leaf-list': if s_node.get('ordered-by') == 'user' and \ s_node.tag not in ordered_by_user: ordered_by_user[s_node.tag] = 'leaf-list' self._merge_text(child_self, child_other) elif s_node.get('type') == 'list': keys = self._get_list_keys(s_node) if s_node.get('ordered-by') == 'user' and \ s_node.tag not in ordered_by_user: ordered_by_user[s_node.tag] = keys for key in keys: key_node = child_self.find(key) e = etree.SubElement( child_other, key, nsmap=key_node.nsmap) e.text = key_node.text for child_other in in_o_not_in_s: child_self = etree.Element(child_other.tag, {operation_tag: self.preferred_delete}, nsmap=child_other.nsmap) if self.preferred_create != 'merge': child_other.set(operation_tag, self.preferred_create) siblings = list(node_self.iterchildren(tag=child_other.tag)) if siblings: siblings[-1].addnext(child_self) else: node_self.append(child_self) s_node = self.device.get_schema_node(child_other) if s_node.get('type') == 'leaf-list': if s_node.get('ordered-by') == 'user' and \ s_node.tag not in ordered_by_user: ordered_by_user[s_node.tag] = 'leaf-list' self._merge_text(child_other, child_self) elif s_node.get('type') == 'list': keys = self._get_list_keys(s_node) if s_node.get('ordered-by') == 'user' and \ s_node.tag not in ordered_by_user: ordered_by_user[s_node.tag] = keys for key in keys: key_node = child_other.find(key) e = etree.SubElement(child_self, key, nsmap=key_node.nsmap) e.text = key_node.text for child_self, child_other in in_s_and_in_o: s_node = self.device.get_schema_node(child_self) if s_node.get('type') == 'leaf': if self._same_text(child_self, child_other): if not s_node.get('is_key'): node_self.remove(child_self) node_other.remove(child_other) else: if self.preferred_replace != 'merge': child_self.set(operation_tag, self.preferred_replace) child_other.set(operation_tag, self.preferred_replace) elif s_node.get('type') == 'leaf-list': if s_node.get('ordered-by') == 'user': if s_node.tag not in ordered_by_user: ordered_by_user[s_node.tag] = 'leaf-list' else: node_self.remove(child_self) node_other.remove(child_other) elif s_node.get('type') == 'container': if self._node_le(child_self, child_other) and \ self._node_le(child_other, child_self): node_self.remove(child_self) node_other.remove(child_other) else: self.node_sub(child_self, child_other) elif s_node.get('type') == 'list': if s_node.get('ordered-by') == 'user' and \ s_node.tag not in ordered_by_user: ordered_by_user[s_node.tag] = self._get_list_keys(s_node) if self._node_le(child_self, child_other) and \ self._node_le(child_other, child_self): if s_node.get('ordered-by') == 'user': for child in child_self.getchildren(): schema_node = self.device.get_schema_node(child) if not schema_node.get('is_key'): child_self.remove(child) for child in child_other.getchildren(): schema_node = self.device.get_schema_node(child) if not schema_node.get('is_key'): child_other.remove(child) else: node_self.remove(child_self) node_other.remove(child_other) else: self.node_sub(child_self, child_other) else: path = self.device.get_xpath(s_node) raise ModelError("unknown schema node type: type of node {}" "is '{}'".format(path, s_node.get('type'))) for tag in ordered_by_user: scope_s = in_s_not_in_o + in_s_and_in_o scope_o = in_o_not_in_s + in_s_and_in_o for sequence in ( self._get_sequence(scope_s, tag, node_self), self._get_sequence(scope_o, tag, node_other), ): for item in sequence: # modifying the namespace mapping of a node is not possible # in lxml. See https://bugs.launchpad.net/lxml/+bug/555602 # if 'yang' not in item.nsmap: # item.nsmap['yang'] = yang_url i = sequence.index(item) if i == 0: item.set(insert_tag, 'first') else: item.set(insert_tag, 'after') precursor = sequence[i - 1] if ordered_by_user[tag] == 'leaf-list': item.set(value_tag, precursor.text) else: keys = ordered_by_user[tag] key_nodes = {k: precursor.find(k) for k in keys} ids = { k: self._url_to_prefix(n, k) for k, n in key_nodes.items() } id_list = [ "[{}='{}']".format(ids[k], key_nodes[k].text) for k in keys ] item.set(key_tag, ''.join(id_list))
@staticmethod def _url_to_prefix(node, id): '''_url_to_prefix Low-level api: Convert an identifier from `{namespace}tagname` notation to `prefix:tagname` notation by looking at nsmap of the node. If the identifier does not have a namespace, the identifier is simply returned without modification. Parameters ---------- node : `str` A config node. Its identifier will be converted. id : `str` Identifier in `{namespace}tagname` notation. Returns ------- str Identifier in `prefix:tagname` notation. ''' prefixes = {v: k for k, v in node.nsmap.items()} ret = re.search('^{(.+)}(.+)$', id) if ret: if ret.group(1) in prefixes: if prefixes[ret.group(1)] is None: return ret.group(2) else: return prefixes[ret.group(1)] + ':' + ret.group(2) return id