| # -*- coding: utf-8 -*- |
| """ |
| Device Tree Blob Parser |
| |
| Copyright 2014 Neil 'superna' Armstrong <superna9999@gmail.com> |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| |
| @author: Neil 'superna' Armstrong <superna9999@gmail.com> |
| """ |
| |
| from __future__ import print_function |
| from __future__ import absolute_import |
| |
| import string |
| import os |
| import json |
| from copy import deepcopy, copy |
| from struct import Struct, unpack, pack |
| |
| FDT_MAGIC = 0xd00dfeed |
| FDT_BEGIN_NODE = 0x1 |
| FDT_END_NODE = 0x2 |
| FDT_PROP = 0x3 |
| FDT_NOP = 0x4 |
| FDT_END = 0x9 |
| |
| INDENT = ' ' * 4 |
| |
| FDT_MAX_VERSION = 17 |
| |
| |
| class FdtProperty(object): |
| """ Represents an empty property""" |
| |
| @staticmethod |
| def __validate_dt_name(name): |
| """Checks the name validity""" |
| return not any([True for char in name |
| if char not in string.printable]) |
| |
| def __init__(self, name): |
| """Init with name""" |
| self.name = name |
| if not FdtProperty.__validate_dt_name(self.name): |
| raise Exception("Invalid name '%s'" % self.name) |
| |
| def get_name(self): |
| """Get property name""" |
| return self.name |
| |
| def __str__(self): |
| """String representation""" |
| return "Property(%s)" % self.name |
| |
| def dts_represent(self, depth=0): |
| """Get dts string representation""" |
| return INDENT*depth + self.name + ';' |
| |
| def dtb_represent(self, string_store, pos=0, version=17): |
| """Get blob representation""" |
| # print "%x:%s" % (pos, self) |
| strpos = string_store.find(self.name+'\0') |
| if strpos < 0: |
| strpos = len(string_store) |
| string_store += self.name+'\0' |
| pos += 12 |
| return (pack('>III', FDT_PROP, 0, strpos), |
| string_store, pos) |
| |
| def json_represent(self, depth=0): |
| """Ouput JSON""" |
| return '%s: null' % json.dumps(self.name) |
| |
| def to_raw(self): |
| """Return RAW value representation""" |
| return '' |
| |
| def __getitem__(self, value): |
| """Returns No Items""" |
| return None |
| |
| def __ne__(self, node): |
| """Check property inequality |
| """ |
| return not self.__eq__(node) |
| |
| def __eq__(self, node): |
| """Check node equality |
| check properties are the same (same values) |
| """ |
| if not isinstance(node, FdtProperty): |
| raise Exception("Invalid object type") |
| if self.name != node.get_name(): |
| return False |
| return True |
| |
| @staticmethod |
| def __check_prop_strings(value): |
| """Check property string validity |
| Python version of util_is_printable_string from dtc |
| """ |
| pos = 0 |
| posi = 0 |
| end = len(value) |
| |
| if not len(value): |
| return None |
| |
| #Needed for python 3 support: If a bytes object is passed, |
| #decode it with the ascii codec. If the decoding fails, assume |
| #it was not a string object. |
| try: |
| value = value.decode('ascii') |
| except ValueError: |
| return None |
| |
| #Test both against string 0 and int 0 because of |
| # python2/3 compatibility |
| if value[-1] != '\0': |
| return None |
| |
| while pos < end: |
| posi = pos |
| while pos < end and value[pos] != '\0' \ |
| and value[pos] in string.printable \ |
| and value[pos] not in ('\r', '\n'): |
| pos += 1 |
| |
| if value[pos] != '\0' or pos == posi: |
| return None |
| pos += 1 |
| |
| return True |
| |
| @staticmethod |
| def new_raw_property(name, raw_value): |
| """Instantiate property with raw value type""" |
| if FdtProperty.__check_prop_strings(raw_value): |
| return FdtPropertyStrings.init_raw(name, raw_value) |
| elif len(raw_value) and len(raw_value) % 4 == 0: |
| return FdtPropertyWords.init_raw(name, raw_value) |
| elif len(raw_value) and len(raw_value): |
| return FdtPropertyBytes.init_raw(name, raw_value) |
| else: |
| return FdtProperty(name) |
| |
| |
| class FdtPropertyStrings(FdtProperty): |
| """Property with strings as value""" |
| |
| @classmethod |
| def __extract_prop_strings(cls, value): |
| """Extract strings from raw_value""" |
| return [st for st in \ |
| value.decode('ascii').split('\0') if len(st)] |
| |
| def __init__(self, name, strings): |
| """Init with strings""" |
| FdtProperty.__init__(self, name) |
| if not strings: |
| raise Exception("Invalid strings") |
| for stri in strings: |
| if len(stri) == 0: |
| raise Exception("Invalid strings") |
| if any([True for char in stri |
| if char not in string.printable |
| or char in ('\r', '\n')]): |
| raise Exception("Invalid chars in strings") |
| self.strings = strings |
| |
| @classmethod |
| def init_raw(cls, name, raw_value): |
| """Init from raw""" |
| return cls(name, cls.__extract_prop_strings(raw_value)) |
| |
| def dts_represent(self, depth=0): |
| """Get dts string representation""" |
| return INDENT*depth + self.name + ' = "' + \ |
| '", "'.join(self.strings) + '";' |
| |
| def dtb_represent(self, string_store, pos=0, version=17): |
| """Get blob representation""" |
| # print "%x:%s" % (pos, self) |
| blob = pack('') |
| for chars in self.strings: |
| blob += chars.encode('ascii') + pack('b', 0) |
| blob_len = len(blob) |
| if version < 16 and (pos+12) % 8 != 0: |
| blob = pack('b', 0) * (8-((pos+12) % 8)) + blob |
| if blob_len % 4: |
| blob += pack('b', 0) * (4-(blob_len % 4)) |
| strpos = string_store.find(self.name+'\0') |
| if strpos < 0: |
| strpos = len(string_store) |
| string_store += self.name+'\0' |
| blob = pack('>III', FDT_PROP, blob_len, strpos) + blob |
| pos += len(blob) |
| return (blob, string_store, pos) |
| |
| def json_represent(self, depth=0): |
| """Ouput JSON""" |
| result = '%s: ["strings", ' % json.dumps(self.name) |
| result += ', '.join([json.dumps(stri) for stri in self.strings]) |
| result += ']' |
| return result |
| |
| def to_raw(self): |
| """Return RAW value representation""" |
| return ''.join([chars+'\0' for chars in self.strings]) |
| |
| def __str__(self): |
| """String representation""" |
| return "Property(%s,Strings:%s)" % (self.name, self.strings) |
| |
| def __getitem__(self, index): |
| """Get strings, returns a string""" |
| return self.strings[index] |
| |
| def __len__(self): |
| """Get strings count""" |
| return len(self.strings) |
| |
| def __eq__(self, node): |
| """Check node equality |
| check properties are the same (same values) |
| """ |
| if not FdtProperty.__eq__(self, node): |
| return False |
| if self.__len__() != len(node): |
| return False |
| for index in range(self.__len__()): |
| if self.strings[index] != node[index]: |
| return False |
| return True |
| |
| class FdtPropertyWords(FdtProperty): |
| """Property with words as value""" |
| |
| def __init__(self, name, words): |
| """Init with words""" |
| FdtProperty.__init__(self, name) |
| for word in words: |
| if not 0 <= word <= 4294967295: |
| raise Exception(("Invalid word value %d, requires " + |
| "0 <= number <= 4294967295") % word) |
| if not len(words): |
| raise Exception("Invalid Words") |
| self.words = words |
| |
| @classmethod |
| def init_raw(cls, name, raw_value): |
| """Init from raw""" |
| if len(raw_value) % 4 == 0: |
| words = [unpack(">I", raw_value[i:i+4])[0] |
| for i in range(0, len(raw_value), 4)] |
| return cls(name, words) |
| else: |
| raise Exception("Invalid raw Words") |
| |
| def dts_represent(self, depth=0): |
| """Get dts string representation""" |
| return INDENT*depth + self.name + ' = <' + \ |
| ' '.join(["0x%08x" % word for word in self.words]) + ">;" |
| |
| def dtb_represent(self, string_store, pos=0, version=17): |
| """Get blob representation""" |
| # # print "%x:%s" % (pos, self) |
| strpos = string_store.find(self.name+'\0') |
| if strpos < 0: |
| strpos = len(string_store) |
| string_store += self.name+'\0' |
| blob = pack('>III', FDT_PROP, len(self.words)*4, strpos) + \ |
| pack('').join([pack('>I', word) for word in self.words]) |
| pos += len(blob) |
| return (blob, string_store, pos) |
| |
| def json_represent(self, depth=0): |
| """Ouput JSON""" |
| result = '%s: ["words", "' % json.dumps(self.name) |
| result += '", "'.join(["0x%08x" % word for word in self.words]) |
| result += '"]' |
| return result |
| |
| def to_raw(self): |
| """Return RAW value representation""" |
| return ''.join([pack('>I', word) for word in self.words]) |
| |
| def __str__(self): |
| """String representation""" |
| return "Property(%s,Words:%s)" % (self.name, self.words) |
| |
| def __getitem__(self, index): |
| """Get words, returns a word integer""" |
| return self.words[index] |
| |
| def __len__(self): |
| """Get words count""" |
| return len(self.words) |
| |
| def __eq__(self, node): |
| """Check node equality |
| check properties are the same (same values) |
| """ |
| if not FdtProperty.__eq__(self, node): |
| return False |
| if self.__len__() != len(node): |
| return False |
| for index in range(self.__len__()): |
| if self.words[index] != node[index]: |
| return False |
| return True |
| |
| |
| class FdtPropertyBytes(FdtProperty): |
| """Property with signed bytes as value""" |
| |
| def __init__(self, name, bytez): |
| """Init with bytes""" |
| FdtProperty.__init__(self, name) |
| for byte in bytez: |
| if not -128 <= byte <= 127: |
| raise Exception(("Invalid value for byte %d, " + |
| "requires -128 <= number <= 127") % byte) |
| if not bytez: |
| raise Exception("Invalid Bytes") |
| self.bytes = bytez |
| |
| @classmethod |
| def init_raw(cls, name, raw_value): |
| """Init from raw""" |
| return cls(name, unpack('b' * len(raw_value), raw_value)) |
| |
| def dts_represent(self, depth=0): |
| """Get dts string representation""" |
| return INDENT*depth + self.name + ' = [' + \ |
| ' '.join(["%02x" % (byte & int('ffffffff',16)) |
| for byte in self.bytes]) + "];" |
| |
| def dtb_represent(self, string_store, pos=0, version=17): |
| """Get blob representation""" |
| # print "%x:%s" % (pos, self) |
| strpos = string_store.find(self.name+'\0') |
| if strpos < 0: |
| strpos = len(string_store) |
| string_store += self.name+'\0' |
| blob = pack('>III', FDT_PROP, len(self.bytes), strpos) |
| blob += pack('').join([pack('>b', byte) for byte in self.bytes]) |
| if len(blob) % 4: |
| blob += pack('b', 0) * (4-(len(blob) % 4)) |
| pos += len(blob) |
| return (blob, string_store, pos) |
| |
| def json_represent(self, depth=0): |
| """Ouput JSON""" |
| result = '%s: ["bytes", "' % json.dumps(self.name) |
| result += '", "'.join(["%02x" % byte |
| for byte in self.bytes]) |
| result += '"]' |
| return result |
| |
| def to_raw(self): |
| """Return RAW value representation""" |
| return ''.join([pack('>b', byte) for byte in self.bytes]) |
| |
| def __str__(self): |
| """String representation""" |
| return "Property(%s,Bytes:%s)" % (self.name, self.bytes) |
| |
| def __getitem__(self, index): |
| """Get bytes, returns a byte""" |
| return self.bytes[index] |
| |
| def __len__(self): |
| """Get strings count""" |
| return len(self.bytes) |
| |
| def __eq__(self, node): |
| """Check node equality |
| check properties are the same (same values) |
| """ |
| if not FdtProperty.__eq__(self, node): |
| return False |
| if self.__len__() != len(node): |
| return False |
| for index in range(self.__len__()): |
| if self.bytes[index] != node[index]: |
| return False |
| return True |
| |
| |
| class FdtNop(object): # pylint: disable-msg=R0903 |
| """Nop child representation""" |
| |
| def __init__(self): |
| """Init with nothing""" |
| |
| def get_name(self): # pylint: disable-msg=R0201 |
| """Return name""" |
| return None |
| |
| def __str__(self): |
| """String representation""" |
| return '' |
| |
| def dts_represent(self, depth=0): # pylint: disable-msg=R0201 |
| """Get dts string representation""" |
| return INDENT*depth+'// [NOP]' |
| |
| def dtb_represent(self, string_store, pos=0, version=17): |
| """Get blob representation""" |
| # print "%x:%s" % (pos, self) |
| pos += 4 |
| return (pack('>I', FDT_NOP), string_store, pos) |
| |
| |
| class FdtNode(object): |
| """Node representation""" |
| |
| @staticmethod |
| def __validate_dt_name(name): |
| """Checks the name validity""" |
| return not any([True for char in name |
| if char not in string.printable]) |
| |
| def __init__(self, name): |
| """Init node with name""" |
| self.name = name |
| self.subdata = [] |
| self.parent = None |
| if not FdtNode.__validate_dt_name(self.name): |
| raise Exception("Invalid name '%s'" % self.name) |
| |
| def get_name(self): |
| """Get property name""" |
| return self.name |
| |
| def __check_name_duplicate(self, name): |
| """Checks if name is not in a subnode""" |
| for data in self.subdata: |
| if not isinstance(data, FdtNop) \ |
| and data.get_name() == name: |
| return True |
| return False |
| |
| def add_subnode(self, node): |
| """Add child, deprecated use append()""" |
| self.append(node) |
| |
| def add_raw_attribute(self, name, raw_value): |
| """Construct a raw attribute and add to child""" |
| self.append(FdtProperty.new_raw_property(name, raw_value)) |
| |
| def set_parent_node(self, node): |
| """Set parent node, None and FdtNode accepted""" |
| if node is not None and \ |
| not isinstance(node, FdtNode): |
| raise Exception("Invalid object type") |
| self.parent = node |
| |
| def get_parent_node(self): |
| """Get parent node""" |
| return self.parent |
| |
| def __str__(self): |
| """String representation""" |
| return "Node(%s)" % self.name |
| |
| def dts_represent(self, depth=0): |
| """Get dts string representation""" |
| result = ('\n').join([sub.dts_represent(depth+1) |
| for sub in self.subdata]) |
| if len(result) > 0: |
| result += '\n' |
| return INDENT*depth + self.name + ' {\n' + \ |
| result + INDENT*depth + "};" |
| |
| def dtb_represent(self, strings_store, pos=0, version=17): |
| """Get blob representation |
| Pass string storage as strings_store, pos for current node start |
| and version as current dtb version |
| """ |
| # print "%x:%s" % (pos, self) |
| strings = strings_store |
| if self.get_name() == '/': |
| blob = pack('>II', FDT_BEGIN_NODE, 0) |
| else: |
| blob = pack('>I', FDT_BEGIN_NODE) |
| blob += self.get_name().encode('ascii') + pack('b', 0) |
| if len(blob) % 4: |
| blob += pack('b', 0) * (4-(len(blob) % 4)) |
| pos += len(blob) |
| for sub in self.subdata: |
| (data, strings, pos) = sub.dtb_represent(strings, pos, version) |
| blob += data |
| pos += 4 |
| blob += pack('>I', FDT_END_NODE) |
| return (blob, strings, pos) |
| |
| def json_represent(self, depth=0): |
| """Get dts string representation""" |
| result = (',\n'+ \ |
| INDENT*(depth+1)).join([sub.json_represent(depth+1) |
| for sub in self.subdata |
| if not isinstance(sub, FdtNop)]) |
| if len(result) > 0: |
| result = INDENT + result + '\n'+INDENT*depth |
| if self.get_name() == '/': |
| return "{\n" + INDENT*(depth) + result + "}" |
| else: |
| return json.dumps(self.name) + ': {\n' + \ |
| INDENT*(depth) + result + "}" |
| |
| def __getitem__(self, index): |
| """Get subnodes, returns either a Node, a Property or a Nop""" |
| return self.subdata[index] |
| |
| def __setitem__(self, index, subnode): |
| """Set node at index, replacing previous subnode, |
| must not be a duplicate name |
| """ |
| if self.subdata[index].get_name() != subnode.get_name() and \ |
| self.__check_name_duplicate(subnode.get_name()): |
| raise Exception("%s : %s subnode already exists" % \ |
| (self, subnode)) |
| if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)): |
| raise Exception("Invalid object type") |
| self.subdata[index] = subnode |
| |
| def __len__(self): |
| """Get strings count""" |
| return len(self.subdata) |
| |
| def __ne__(self, node): |
| """Check node inequality |
| i.e. is subnodes are the same, in either order |
| and properties are the same (same values) |
| The FdtNop is excluded from the check |
| """ |
| return not self.__eq__(node) |
| |
| def __eq__(self, node): |
| """Check node equality |
| i.e. is subnodes are the same, in either order |
| and properties are the same (same values) |
| The FdtNop is excluded from the check |
| """ |
| if not isinstance(node, FdtNode): |
| raise Exception("Invalid object type") |
| if self.name != node.get_name(): |
| return False |
| curnames = set([subnode.get_name() for subnode in self.subdata |
| if not isinstance(subnode, FdtNop)]) |
| cmpnames = set([subnode.get_name() for subnode in node |
| if not isinstance(subnode, FdtNop)]) |
| if curnames != cmpnames: |
| return False |
| for subnode in [subnode for subnode in self.subdata |
| if not isinstance(subnode, FdtNop)]: |
| index = node.index(subnode.get_name()) |
| if subnode != node[index]: |
| return False |
| return True |
| |
| def append(self, subnode): |
| """Append subnode, same as add_subnode""" |
| if self.__check_name_duplicate(subnode.get_name()): |
| raise Exception("%s : %s subnode already exists" % \ |
| (self, subnode)) |
| if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)): |
| raise Exception("Invalid object type") |
| self.subdata.append(subnode) |
| |
| def pop(self, index=-1): |
| """Remove and returns subnode at index, default the last""" |
| return self.subdata.pop(index) |
| |
| def insert(self, index, subnode): |
| """Insert subnode before index, must not be a duplicate name""" |
| if self.__check_name_duplicate(subnode.get_name()): |
| raise Exception("%s : %s subnode already exists" % \ |
| (self, subnode)) |
| if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)): |
| raise Exception("Invalid object type") |
| self.subdata.insert(index, subnode) |
| |
| def _find(self, name): |
| """Find name in subnodes""" |
| for i in range(0, len(self.subdata)): |
| if not isinstance(self.subdata[i], FdtNop) and \ |
| name == self.subdata[i].get_name(): |
| return i |
| return None |
| |
| def remove(self, name): |
| """Remove subnode with the name |
| Raises ValueError is not present |
| """ |
| index = self._find(name) |
| if index is None: |
| raise ValueError("Not present") |
| return self.subdata.pop(index) |
| |
| def index(self, name): |
| """Returns position of subnode with the name |
| Raises ValueError is not present |
| """ |
| index = self._find(name) |
| if index is None: |
| raise ValueError("Not present") |
| return index |
| |
| def merge(self, node): |
| """Merge two nodes and subnodes |
| Replace current properties with the given properties |
| """ |
| if not isinstance(node, FdtNode): |
| raise Exception("Can only merge with a FdtNode") |
| for subnode in [obj for obj in node |
| if isinstance(obj, (FdtNode, FdtProperty))]: |
| index = self._find(subnode.get_name()) |
| if index is None: |
| dup = deepcopy(subnode) |
| if isinstance(subnode, FdtNode): |
| dup.set_parent_node(self) |
| self.append(dup) |
| elif isinstance(subnode, FdtNode): |
| self.subdata[index].merge(subnode) |
| else: |
| self.subdata[index] = copy(subnode) |
| |
| def walk(self): |
| """Walk into subnodes and yield paths and objects |
| Returns set with (path string, node object) |
| """ |
| node = self |
| start = 0 |
| hist = [] |
| curpath = [] |
| |
| while True: |
| for index in range(start, len(node)): |
| if isinstance(node[index], (FdtNode, FdtProperty)): |
| yield ('/' + '/'.join(curpath+[node[index].get_name()]), |
| node[index]) |
| if isinstance(node[index], FdtNode): |
| if len(node[index]): |
| hist.append((node, index+1)) |
| curpath.append(node[index].get_name()) |
| node = node[index] |
| start = 0 |
| index = -1 |
| break |
| if index >= 0: |
| if len(hist): |
| (node, start) = hist.pop() |
| curpath.pop() |
| else: |
| break |
| |
| |
| class Fdt(object): |
| """Flattened Device Tree representation""" |
| |
| def __init__(self, version=17, last_comp_version=16, boot_cpuid_phys=0): |
| """Init FDT object with version and boot values""" |
| self.header = {'magic': FDT_MAGIC, |
| 'totalsize': 0, |
| 'off_dt_struct': 0, |
| 'off_dt_strings': 0, |
| 'off_mem_rsvmap': 0, |
| 'version': version, |
| 'last_comp_version': last_comp_version, |
| 'boot_cpuid_phys': boot_cpuid_phys, |
| 'size_dt_strings': 0, |
| 'size_dt_struct': 0} |
| self.rootnode = None |
| self.prenops = None |
| self.postnops = None |
| self.reserve_entries = None |
| |
| def add_rootnode(self, rootnode, prenops=None, postnops=None): |
| """Add root node""" |
| self.rootnode = rootnode |
| self.prenops = prenops |
| self.postnops = postnops |
| |
| def get_rootnode(self): |
| """Get root node""" |
| return self.rootnode |
| |
| def add_reserve_entries(self, reserve_entries): |
| """Add reserved entries as list of dict with |
| 'address' and 'size' keys""" |
| self.reserve_entries = reserve_entries |
| |
| def to_dts(self): |
| """Export to DTS representation in string format""" |
| result = "/dts-v1/;\n" |
| result += "// version:\t\t%d\n" % self.header['version'] |
| result += "// last_comp_version:\t%d\n" % \ |
| self.header['last_comp_version'] |
| if self.header['version'] >= 2: |
| result += "// boot_cpuid_phys:\t0x%x\n" % \ |
| self.header['boot_cpuid_phys'] |
| result += '\n' |
| if self.reserve_entries is not None: |
| for entry in self.reserve_entries: |
| result += "/memreserve/ " |
| if entry['address']: |
| result += "%#x " % entry['address'] |
| else: |
| result += "0 " |
| if entry['size']: |
| result += "%#x" % entry['size'] |
| else: |
| result += "0" |
| result += ";\n" |
| if self.prenops: |
| result += '\n'.join([nop.dts_represent() for nop in self.prenops]) |
| result += '\n' |
| if self.rootnode is not None: |
| result += self.rootnode.dts_represent() |
| if self.postnops: |
| result += '\n' |
| result += '\n'.join([nop.dts_represent() for nop in self.postnops]) |
| return result |
| |
| def to_dtb(self): |
| """Export to Blob format""" |
| if self.rootnode is None: |
| return None |
| blob_reserve_entries = pack('') |
| if self.reserve_entries is not None: |
| for entry in self.reserve_entries: |
| blob_reserve_entries += pack('>QQ', |
| entry['address'], |
| entry['size']) |
| blob_reserve_entries += pack('>QQ', 0, 0) |
| header_size = 7 * 4 |
| if self.header['version'] >= 2: |
| header_size += 4 |
| if self.header['version'] >= 3: |
| header_size += 4 |
| if self.header['version'] >= 17: |
| header_size += 4 |
| header_adjust = pack('') |
| if header_size % 8 != 0: |
| header_adjust = pack('b', 0) * (8 - (header_size % 8)) |
| header_size += len(header_adjust) |
| dt_start = header_size + len(blob_reserve_entries) |
| # print "dt_start %d" % dt_start |
| (blob_dt, blob_strings, dt_pos) = \ |
| self.rootnode.dtb_represent('', dt_start, self.header['version']) |
| if self.prenops is not None: |
| blob_dt = pack('').join([nop.dtb_represent('')[0] |
| for nop in self.prenops])\ |
| + blob_dt |
| if self.postnops is not None: |
| blob_dt += pack('').join([nop.dtb_represent('')[0] |
| for nop in self.postnops]) |
| blob_dt += pack('>I', FDT_END) |
| self.header['size_dt_strings'] = len(blob_strings) |
| self.header['size_dt_struct'] = len(blob_dt) |
| self.header['off_mem_rsvmap'] = header_size |
| self.header['off_dt_struct'] = dt_start |
| self.header['off_dt_strings'] = dt_start + len(blob_dt) |
| self.header['totalsize'] = dt_start + len(blob_dt) + len(blob_strings) |
| blob_header = pack('>IIIIIII', self.header['magic'], |
| self.header['totalsize'], |
| self.header['off_dt_struct'], |
| self.header['off_dt_strings'], |
| self.header['off_mem_rsvmap'], |
| self.header['version'], |
| self.header['last_comp_version']) |
| if self.header['version'] >= 2: |
| blob_header += pack('>I', self.header['boot_cpuid_phys']) |
| if self.header['version'] >= 3: |
| blob_header += pack('>I', self.header['size_dt_strings']) |
| if self.header['version'] >= 17: |
| blob_header += pack('>I', self.header['size_dt_struct']) |
| return blob_header + header_adjust + blob_reserve_entries + \ |
| blob_dt + blob_strings.encode('ascii') |
| |
| def to_json(self): |
| """Ouput JSON""" |
| if self.rootnode is None: |
| return None |
| return self.rootnode.json_represent() |
| |
| def resolve_path(self, path): |
| """Resolve path like /memory/reg and return either a FdtNode, |
| a FdtProperty or None""" |
| if self.rootnode is None: |
| return None |
| if not path.startswith('/'): |
| return None |
| if len(path) > 1 and path.endswith('/'): |
| path = path[:-1] |
| if path == '/': |
| return self.rootnode |
| curnode = self.rootnode |
| for subpath in path[1:].split('/'): |
| found = None |
| if not isinstance(curnode, FdtNode): |
| return None |
| for node in curnode: |
| if subpath == node.get_name(): |
| found = node |
| break |
| if found is None: |
| return None |
| curnode = found |
| return curnode |
| |
| def _add_json_to_fdtnode(node, subjson): |
| """Populate FdtNode with JSON dict items""" |
| for (key, value) in subjson.items(): |
| if isinstance(value, dict): |
| subnode = FdtNode(key) |
| subnode.set_parent_node(node) |
| node.append(subnode) |
| _add_json_to_fdtnode(subnode, value) |
| elif isinstance(value, list): |
| if len(value) < 2: |
| raise Exception("Invalid list for %s" % key) |
| if value[0] == "words": |
| words = [int(word, 16) for word in value[1:]] |
| node.append(FdtPropertyWords(key, words)) |
| elif value[0] == "bytes": |
| bytez = [int(byte, 16) for byte in value[1:]] |
| node.append(FdtPropertyBytes(key, bytez)) |
| elif value[0] == "strings": |
| node.append(FdtPropertyStrings(key, \ |
| [s for s in value[1:]])) |
| else: |
| raise Exception("Invalid list for %s" % key) |
| elif value is None: |
| node.append(FdtProperty(key)) |
| else: |
| raise Exception("Invalid value for %s" % key) |
| |
| def FdtJsonParse(buf): |
| """Import FDT from JSON representation, see JSONDeviceTree.md for |
| structure and encoding |
| Returns an Fdt object |
| """ |
| tree = json.loads(buf) |
| |
| root = FdtNode('/') |
| |
| _add_json_to_fdtnode(root, tree) |
| |
| fdt = Fdt() |
| fdt.add_rootnode(root) |
| return fdt |
| |
| def FdtFsParse(path): |
| """Parse device tree filesystem and return a Fdt instance |
| Should be /proc/device-tree on a device, or the fusemount.py |
| mount point. |
| """ |
| root = FdtNode("/") |
| |
| if path.endswith('/'): |
| path = path[:-1] |
| |
| nodes = {path: root} |
| |
| for subpath, subdirs, files in os.walk(path): |
| if subpath not in nodes.keys(): |
| raise Exception("os.walk error") |
| cur = nodes[subpath] |
| for f in files: |
| with open(subpath+'/'+f, 'rb') as content_file: |
| content = content_file.read() |
| prop = FdtProperty.new_raw_property(f, content) |
| cur.add_subnode(prop) |
| for subdir in subdirs: |
| subnode = FdtNode(subdir) |
| cur.add_subnode(subnode) |
| subnode.set_parent_node(cur) |
| nodes[subpath+'/'+subdir] = subnode |
| |
| fdt = Fdt() |
| fdt.add_rootnode(root) |
| return fdt |
| |
| class FdtBlobParse(object): # pylint: disable-msg=R0903 |
| """Parse from file input""" |
| |
| __fdt_header_format = ">IIIIIII" |
| __fdt_header_names = ('magic', 'totalsize', 'off_dt_struct', |
| 'off_dt_strings', 'off_mem_rsvmap', 'version', |
| 'last_comp_version') |
| |
| __fdt_reserve_entry_format = ">QQ" |
| __fdt_reserve_entry_names = ('address', 'size') |
| |
| __fdt_dt_cell_format = ">I" |
| __fdt_dt_prop_format = ">II" |
| __fdt_dt_tag_name = {FDT_BEGIN_NODE: 'node_begin', |
| FDT_END_NODE: 'node_end', |
| FDT_PROP: 'prop', |
| FDT_NOP: 'nop', |
| FDT_END: 'end'} |
| |
| def __extract_fdt_header(self): |
| """Extract DTB header""" |
| header = Struct(self.__fdt_header_format) |
| header_entry = Struct(">I") |
| data = self.infile.read(header.size) |
| result = dict(zip(self.__fdt_header_names, header.unpack_from(data))) |
| if result['version'] >= 2: |
| data = self.infile.read(header_entry.size) |
| result['boot_cpuid_phys'] = header_entry.unpack_from(data)[0] |
| if result['version'] >= 3: |
| data = self.infile.read(header_entry.size) |
| result['size_dt_strings'] = header_entry.unpack_from(data)[0] |
| if result['version'] >= 17: |
| data = self.infile.read(header_entry.size) |
| result['size_dt_struct'] = header_entry.unpack_from(data)[0] |
| return result |
| |
| def __extract_fdt_reserve_entries(self): |
| """Extract reserved memory entries""" |
| header = Struct(self.__fdt_reserve_entry_format) |
| entries = [] |
| self.infile.seek(self.fdt_header['off_mem_rsvmap']) |
| while True: |
| data = self.infile.read(header.size) |
| result = dict(zip(self.__fdt_reserve_entry_names, |
| header.unpack_from(data))) |
| if result['address'] == 0 and result['size'] == 0: |
| return entries |
| entries.append(result) |
| |
| def __extract_fdt_nodename(self): |
| """Extract node name""" |
| data = '' |
| pos = self.infile.tell() |
| while True: |
| byte = self.infile.read(1) |
| if ord(byte) == 0: |
| break |
| data += byte.decode('ascii') |
| align_pos = pos + len(data) + 1 |
| align_pos = (((align_pos) + ((4) - 1)) & ~((4) - 1)) |
| self.infile.seek(align_pos) |
| return data |
| |
| def __extract_fdt_string(self, prop_string_pos): |
| """Extract string from string pool""" |
| data = '' |
| pos = self.infile.tell() |
| self.infile.seek(self.fdt_header['off_dt_strings']+prop_string_pos) |
| while True: |
| byte = self.infile.read(1) |
| if ord(byte) == 0: |
| break |
| data += byte.decode('ascii') |
| self.infile.seek(pos) |
| return data |
| |
| def __extract_fdt_prop(self): |
| """Extract property""" |
| prop = Struct(self.__fdt_dt_prop_format) |
| pos = self.infile.tell() |
| data = self.infile.read(prop.size) |
| (prop_size, prop_string_pos,) = prop.unpack_from(data) |
| |
| prop_start = pos + prop.size |
| if self.fdt_header['version'] < 16 and prop_size >= 8: |
| prop_start = (((prop_start) + ((8) - 1)) & ~((8) - 1)) |
| |
| self.infile.seek(prop_start) |
| value = self.infile.read(prop_size) |
| |
| align_pos = self.infile.tell() |
| align_pos = (((align_pos) + ((4) - 1)) & ~((4) - 1)) |
| self.infile.seek(align_pos) |
| |
| return (self.__extract_fdt_string(prop_string_pos), value) |
| |
| def __extract_fdt_dt(self): |
| """Extract tags""" |
| cell = Struct(self.__fdt_dt_cell_format) |
| tags = [] |
| self.infile.seek(self.fdt_header['off_dt_struct']) |
| while True: |
| data = self.infile.read(cell.size) |
| if len(data) < cell.size: |
| break |
| tag, = cell.unpack_from(data) |
| # print "*** %s" % self.__fdt_dt_tag_name.get(tag, '') |
| if self.__fdt_dt_tag_name.get(tag, '') in 'node_begin': |
| name = self.__extract_fdt_nodename() |
| if len(name) == 0: |
| name = '/' |
| tags.append((tag, name)) |
| elif self.__fdt_dt_tag_name.get(tag, '') in ('node_end', 'nop'): |
| tags.append((tag, '')) |
| elif self.__fdt_dt_tag_name.get(tag, '') in 'end': |
| tags.append((tag, '')) |
| break |
| elif self.__fdt_dt_tag_name.get(tag, '') in 'prop': |
| propdata = self.__extract_fdt_prop() |
| tags.append((tag, propdata)) |
| else: |
| print("Unknown Tag %d" % tag) |
| return tags |
| |
| def __init__(self, infile): |
| """Init with file input""" |
| self.infile = infile |
| self.fdt_header = self.__extract_fdt_header() |
| if self.fdt_header['magic'] != FDT_MAGIC: |
| raise Exception('Invalid Magic') |
| if self.fdt_header['version'] > FDT_MAX_VERSION: |
| raise Exception('Invalid Version %d' % self.fdt_header['version']) |
| if self.fdt_header['last_comp_version'] > FDT_MAX_VERSION-1: |
| raise Exception('Invalid last compatible Version %d' % |
| self.fdt_header['last_comp_version']) |
| self.fdt_reserve_entries = self.__extract_fdt_reserve_entries() |
| self.fdt_dt_tags = self.__extract_fdt_dt() |
| |
| def __to_nodes(self): |
| """Represent fdt as Node and properties structure |
| Returns a set with the pre-node Nops, the Root Node, |
| and the post-node Nops. |
| """ |
| prenops = [] |
| postnops = [] |
| rootnode = None |
| curnode = None |
| for tag in self.fdt_dt_tags: |
| if self.__fdt_dt_tag_name.get(tag[0], '') in 'node_begin': |
| newnode = FdtNode(tag[1]) |
| if rootnode is None: |
| rootnode = newnode |
| if curnode is not None: |
| curnode.add_subnode(newnode) |
| newnode.set_parent_node(curnode) |
| curnode = newnode |
| elif self.__fdt_dt_tag_name.get(tag[0], '') in 'node_end': |
| if curnode is not None: |
| curnode = curnode.get_parent_node() |
| elif self.__fdt_dt_tag_name.get(tag[0], '') in 'nop': |
| if curnode is not None: |
| curnode.add_subnode(FdtNop()) |
| elif rootnode is not None: |
| postnops.append(FdtNop()) |
| else: |
| prenops.append(FdtNop()) |
| elif self.__fdt_dt_tag_name.get(tag[0], '') in 'prop': |
| if curnode is not None: |
| curnode.add_raw_attribute(tag[1][0], tag[1][1]) |
| elif self.__fdt_dt_tag_name.get(tag[0], '') in 'end': |
| continue |
| return (prenops, rootnode, postnops) |
| |
| def to_fdt(self): |
| """Create a fdt object |
| Returns a Fdt object |
| """ |
| if self.fdt_header['version'] >= 2: |
| boot_cpuid_phys = self.fdt_header['boot_cpuid_phys'] |
| else: |
| boot_cpuid_phys = 0 |
| fdt = Fdt(version=self.fdt_header['version'], |
| last_comp_version=self.fdt_header['last_comp_version'], |
| boot_cpuid_phys=boot_cpuid_phys) |
| (prenops, rootnode, postnops) = self.__to_nodes() |
| fdt.add_rootnode(rootnode, prenops=prenops, postnops=postnops) |
| fdt.add_reserve_entries(self.fdt_reserve_entries) |
| return fdt |