blob: 3b3238e258801bf4443054b1d9a598b461ccaef7 [file] [log] [blame]
# -*- coding: utf-8 -*-
"""
Device Tree Blob Parser
Copyright 2014 Neil 'superna' Armstrong <superna9999@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@author: Neil 'superna' Armstrong <superna9999@gmail.com>
"""
import string
import os
import json
from copy import deepcopy, copy
from struct import Struct, unpack, pack
FDT_MAGIC = 0xd00dfeed
FDT_BEGIN_NODE = 0x1
FDT_END_NODE = 0x2
FDT_PROP = 0x3
FDT_NOP = 0x4
FDT_END = 0x9
INDENT = ' ' * 4
FDT_MAX_VERSION = 17
class FdtProperty(object):
""" Represents an empty property"""
@staticmethod
def __validate_dt_name(name):
"""Checks the name validity"""
return not any([True for char in name
if char not in string.printable])
def __init__(self, name):
"""Init with name"""
self.name = name
if not FdtProperty.__validate_dt_name(self.name):
raise Exception("Invalid name '%s'" % self.name)
def get_name(self):
"""Get property name"""
return self.name
def __str__(self):
"""String representation"""
return "Property(%s)" % self.name
def dts_represent(self, depth=0):
"""Get dts string representation"""
return INDENT*depth + self.name + ';'
def dtb_represent(self, string_store, pos=0, version=17):
"""Get blob representation"""
# print "%x:%s" % (pos, self)
strpos = string_store.find(self.name+'\0')
if strpos < 0:
strpos = len(string_store)
string_store += self.name+'\0'
pos += 12
return (pack('>III', FDT_PROP, 0, strpos),
string_store, pos)
def json_represent(self, depth=0):
"""Ouput JSON"""
return '%s: null' % json.dumps(self.name)
def to_raw(self):
"""Return RAW value representation"""
return ''
def __getitem__(self, value):
"""Returns No Items"""
return None
def __ne__(self, node):
"""Check property inequality
"""
return not self.__eq__(node)
def __eq__(self, node):
"""Check node equality
check properties are the same (same values)
"""
if not isinstance(node, FdtProperty):
raise Exception("Invalid object type")
if self.name != node.get_name():
return False
return True
@staticmethod
def __check_prop_strings(value):
"""Check property string validity
Python version of util_is_printable_string from dtc
"""
pos = 0
posi = 0
end = len(value)
if not len(value):
return None
#Needed for python 3 support: If a bytes object is passed,
#decode it with the ascii codec. If the decoding fails, assume
#it was not a string object.
try:
value = value.decode('ascii')
except ValueError:
return None
#Test both against string 0 and int 0 because of
# python2/3 compatibility
if value[-1] != '\0':
return None
while pos < end:
posi = pos
while pos < end and value[pos] != '\0' \
and value[pos] in string.printable \
and value[pos] not in ('\r', '\n'):
pos += 1
if value[pos] != '\0' or pos == posi:
return None
pos += 1
return True
@staticmethod
def new_raw_property(name, raw_value):
"""Instantiate property with raw value type"""
if FdtProperty.__check_prop_strings(raw_value):
return FdtPropertyStrings.init_raw(name, raw_value)
elif len(raw_value) and len(raw_value) % 4 == 0:
return FdtPropertyWords.init_raw(name, raw_value)
elif len(raw_value) and len(raw_value):
return FdtPropertyBytes.init_raw(name, raw_value)
else:
return FdtProperty(name)
class FdtPropertyStrings(FdtProperty):
"""Property with strings as value"""
@classmethod
def __extract_prop_strings(cls, value):
"""Extract strings from raw_value"""
return [st for st in \
value.decode('ascii').split('\0') if len(st)]
def __init__(self, name, strings):
"""Init with strings"""
FdtProperty.__init__(self, name)
if not strings:
raise Exception("Invalid strings")
for stri in strings:
if len(stri) == 0:
raise Exception("Invalid strings")
if any([True for char in stri
if char not in string.printable
or char in ('\r', '\n')]):
raise Exception("Invalid chars in strings")
self.strings = strings
@classmethod
def init_raw(cls, name, raw_value):
"""Init from raw"""
return cls(name, cls.__extract_prop_strings(raw_value))
def dts_represent(self, depth=0):
"""Get dts string representation"""
return INDENT*depth + self.name + ' = "' + \
'", "'.join(self.strings) + '";'
def dtb_represent(self, string_store, pos=0, version=17):
"""Get blob representation"""
# print "%x:%s" % (pos, self)
blob = pack('')
for chars in self.strings:
blob += chars.encode('ascii') + pack('b', 0)
blob_len = len(blob)
if version < 16 and (pos+12) % 8 != 0:
blob = pack('b', 0) * (8-((pos+12) % 8)) + blob
if blob_len % 4:
blob += pack('b', 0) * (4-(blob_len % 4))
strpos = string_store.find(self.name+'\0')
if strpos < 0:
strpos = len(string_store)
string_store += self.name+'\0'
blob = pack('>III', FDT_PROP, blob_len, strpos) + blob
pos += len(blob)
return (blob, string_store, pos)
def json_represent(self, depth=0):
"""Ouput JSON"""
result = '%s: ["strings", ' % json.dumps(self.name)
result += ', '.join([json.dumps(stri) for stri in self.strings])
result += ']'
return result
def to_raw(self):
"""Return RAW value representation"""
return ''.join([chars+'\0' for chars in self.strings])
def __str__(self):
"""String representation"""
return "Property(%s,Strings:%s)" % (self.name, self.strings)
def __getitem__(self, index):
"""Get strings, returns a string"""
return self.strings[index]
def __len__(self):
"""Get strings count"""
return len(self.strings)
def __eq__(self, node):
"""Check node equality
check properties are the same (same values)
"""
if not FdtProperty.__eq__(self, node):
return False
if self.__len__() != len(node):
return False
for index in range(self.__len__()):
if self.strings[index] != node[index]:
return False
return True
class FdtPropertyWords(FdtProperty):
"""Property with words as value"""
def __init__(self, name, words):
"""Init with words"""
FdtProperty.__init__(self, name)
for word in words:
if not 0 <= word <= 4294967295:
raise Exception(("Invalid word value %d, requires " +
"0 <= number <= 4294967295") % word)
if not len(words):
raise Exception("Invalid Words")
self.words = words
@classmethod
def init_raw(cls, name, raw_value):
"""Init from raw"""
if len(raw_value) % 4 == 0:
words = [unpack(">I", raw_value[i:i+4])[0]
for i in range(0, len(raw_value), 4)]
return cls(name, words)
else:
raise Exception("Invalid raw Words")
def dts_represent(self, depth=0):
"""Get dts string representation"""
return INDENT*depth + self.name + ' = <' + \
' '.join(["0x%08x" % word for word in self.words]) + ">;"
def dtb_represent(self, string_store, pos=0, version=17):
"""Get blob representation"""
# # print "%x:%s" % (pos, self)
strpos = string_store.find(self.name+'\0')
if strpos < 0:
strpos = len(string_store)
string_store += self.name+'\0'
blob = pack('>III', FDT_PROP, len(self.words)*4, strpos) + \
pack('').join([pack('>I', word) for word in self.words])
pos += len(blob)
return (blob, string_store, pos)
def json_represent(self, depth=0):
"""Ouput JSON"""
result = '%s: ["words", "' % json.dumps(self.name)
result += '", "'.join(["0x%08x" % word for word in self.words])
result += '"]'
return result
def to_raw(self):
"""Return RAW value representation"""
return ''.join([pack('>I', word) for word in self.words])
def __str__(self):
"""String representation"""
return "Property(%s,Words:%s)" % (self.name, self.words)
def __getitem__(self, index):
"""Get words, returns a word integer"""
return self.words[index]
def __len__(self):
"""Get words count"""
return len(self.words)
def __eq__(self, node):
"""Check node equality
check properties are the same (same values)
"""
if not FdtProperty.__eq__(self, node):
return False
if self.__len__() != len(node):
return False
for index in range(self.__len__()):
if self.words[index] != node[index]:
return False
return True
class FdtPropertyBytes(FdtProperty):
"""Property with signed bytes as value"""
def __init__(self, name, bytez):
"""Init with bytes"""
FdtProperty.__init__(self, name)
for byte in bytez:
if not -128 <= byte <= 127:
raise Exception(("Invalid value for byte %d, " +
"requires -128 <= number <= 127") % byte)
if not bytez:
raise Exception("Invalid Bytes")
self.bytes = bytez
@classmethod
def init_raw(cls, name, raw_value):
"""Init from raw"""
return cls(name, unpack('b' * len(raw_value), raw_value))
def dts_represent(self, depth=0):
"""Get dts string representation"""
return INDENT*depth + self.name + ' = [' + \
' '.join(["%02x" % (byte & int('ffffffff',16))
for byte in self.bytes]) + "];"
def dtb_represent(self, string_store, pos=0, version=17):
"""Get blob representation"""
# print "%x:%s" % (pos, self)
strpos = string_store.find(self.name+'\0')
if strpos < 0:
strpos = len(string_store)
string_store += self.name+'\0'
blob = pack('>III', FDT_PROP, len(self.bytes), strpos)
blob += pack('').join([pack('>b', byte) for byte in self.bytes])
if len(blob) % 4:
blob += pack('b', 0) * (4-(len(blob) % 4))
pos += len(blob)
return (blob, string_store, pos)
def json_represent(self, depth=0):
"""Ouput JSON"""
result = '%s: ["bytes", "' % json.dumps(self.name)
result += '", "'.join(["%02x" % byte
for byte in self.bytes])
result += '"]'
return result
def to_raw(self):
"""Return RAW value representation"""
return ''.join([pack('>b', byte) for byte in self.bytes])
def __str__(self):
"""String representation"""
return "Property(%s,Bytes:%s)" % (self.name, self.bytes)
def __getitem__(self, index):
"""Get bytes, returns a byte"""
return self.bytes[index]
def __len__(self):
"""Get strings count"""
return len(self.bytes)
def __eq__(self, node):
"""Check node equality
check properties are the same (same values)
"""
if not FdtProperty.__eq__(self, node):
return False
if self.__len__() != len(node):
return False
for index in range(self.__len__()):
if self.bytes[index] != node[index]:
return False
return True
class FdtNop(object): # pylint: disable-msg=R0903
"""Nop child representation"""
def __init__(self):
"""Init with nothing"""
def get_name(self): # pylint: disable-msg=R0201
"""Return name"""
return None
def __str__(self):
"""String representation"""
return ''
def dts_represent(self, depth=0): # pylint: disable-msg=R0201
"""Get dts string representation"""
return INDENT*depth+'// [NOP]'
def dtb_represent(self, string_store, pos=0, version=17):
"""Get blob representation"""
# print "%x:%s" % (pos, self)
pos += 4
return (pack('>I', FDT_NOP), string_store, pos)
class FdtNode(object):
"""Node representation"""
@staticmethod
def __validate_dt_name(name):
"""Checks the name validity"""
return not any([True for char in name
if char not in string.printable])
def __init__(self, name):
"""Init node with name"""
self.name = name
self.subdata = []
self.parent = None
if not FdtNode.__validate_dt_name(self.name):
raise Exception("Invalid name '%s'" % self.name)
def get_name(self):
"""Get property name"""
return self.name
def __check_name_duplicate(self, name):
"""Checks if name is not in a subnode"""
for data in self.subdata:
if not isinstance(data, FdtNop) \
and data.get_name() == name:
return True
return False
def add_subnode(self, node):
"""Add child, deprecated use append()"""
self.append(node)
def add_raw_attribute(self, name, raw_value):
"""Construct a raw attribute and add to child"""
self.append(FdtProperty.new_raw_property(name, raw_value))
def set_parent_node(self, node):
"""Set parent node, None and FdtNode accepted"""
if node is not None and \
not isinstance(node, FdtNode):
raise Exception("Invalid object type")
self.parent = node
def get_parent_node(self):
"""Get parent node"""
return self.parent
def __str__(self):
"""String representation"""
return "Node(%s)" % self.name
def dts_represent(self, depth=0):
"""Get dts string representation"""
result = ('\n').join([sub.dts_represent(depth+1)
for sub in self.subdata])
if len(result) > 0:
result += '\n'
return INDENT*depth + self.name + ' {\n' + \
result + INDENT*depth + "};"
def dtb_represent(self, strings_store, pos=0, version=17):
"""Get blob representation
Pass string storage as strings_store, pos for current node start
and version as current dtb version
"""
# print "%x:%s" % (pos, self)
strings = strings_store
if self.get_name() == '/':
blob = pack('>II', FDT_BEGIN_NODE, 0)
else:
blob = pack('>I', FDT_BEGIN_NODE)
blob += self.get_name().encode('ascii') + pack('b', 0)
if len(blob) % 4:
blob += pack('b', 0) * (4-(len(blob) % 4))
pos += len(blob)
for sub in self.subdata:
(data, strings, pos) = sub.dtb_represent(strings, pos, version)
blob += data
pos += 4
blob += pack('>I', FDT_END_NODE)
return (blob, strings, pos)
def json_represent(self, depth=0):
"""Get dts string representation"""
result = (',\n'+ \
INDENT*(depth+1)).join([sub.json_represent(depth+1)
for sub in self.subdata
if not isinstance(sub, FdtNop)])
if len(result) > 0:
result = INDENT + result + '\n'+INDENT*depth
if self.get_name() == '/':
return "{\n" + INDENT*(depth) + result + "}"
else:
return json.dumps(self.name) + ': {\n' + \
INDENT*(depth) + result + "}"
def __getitem__(self, index):
"""Get subnodes, returns either a Node, a Property or a Nop"""
return self.subdata[index]
def __setitem__(self, index, subnode):
"""Set node at index, replacing previous subnode,
must not be a duplicate name
"""
if self.subdata[index].get_name() != subnode.get_name() and \
self.__check_name_duplicate(subnode.get_name()):
raise Exception("%s : %s subnode already exists" % \
(self, subnode))
if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)):
raise Exception("Invalid object type")
self.subdata[index] = subnode
def __len__(self):
"""Get strings count"""
return len(self.subdata)
def __ne__(self, node):
"""Check node inequality
i.e. is subnodes are the same, in either order
and properties are the same (same values)
The FdtNop is excluded from the check
"""
return not self.__eq__(node)
def __eq__(self, node):
"""Check node equality
i.e. is subnodes are the same, in either order
and properties are the same (same values)
The FdtNop is excluded from the check
"""
if not isinstance(node, FdtNode):
raise Exception("Invalid object type")
if self.name != node.get_name():
return False
curnames = set([subnode.get_name() for subnode in self.subdata
if not isinstance(subnode, FdtNop)])
cmpnames = set([subnode.get_name() for subnode in node
if not isinstance(subnode, FdtNop)])
if curnames != cmpnames:
return False
for subnode in [subnode for subnode in self.subdata
if not isinstance(subnode, FdtNop)]:
index = node.index(subnode.get_name())
if subnode != node[index]:
return False
return True
def append(self, subnode):
"""Append subnode, same as add_subnode"""
if self.__check_name_duplicate(subnode.get_name()):
raise Exception("%s : %s subnode already exists" % \
(self, subnode))
if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)):
raise Exception("Invalid object type")
self.subdata.append(subnode)
def pop(self, index=-1):
"""Remove and returns subnode at index, default the last"""
return self.subdata.pop(index)
def insert(self, index, subnode):
"""Insert subnode before index, must not be a duplicate name"""
if self.__check_name_duplicate(subnode.get_name()):
raise Exception("%s : %s subnode already exists" % \
(self, subnode))
if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)):
raise Exception("Invalid object type")
self.subdata.insert(index, subnode)
def _find(self, name):
"""Find name in subnodes"""
for i in range(0, len(self.subdata)):
if not isinstance(self.subdata[i], FdtNop) and \
name == self.subdata[i].get_name():
return i
return None
def remove(self, name):
"""Remove subnode with the name
Raises ValueError is not present
"""
index = self._find(name)
if index is None:
raise ValueError("Not present")
return self.subdata.pop(index)
def index(self, name):
"""Returns position of subnode with the name
Raises ValueError is not present
"""
index = self._find(name)
if index is None:
raise ValueError("Not present")
return index
def merge(self, node):
"""Merge two nodes and subnodes
Replace current properties with the given properties
"""
if not isinstance(node, FdtNode):
raise Exception("Can only merge with a FdtNode")
for subnode in [obj for obj in node
if isinstance(obj, (FdtNode, FdtProperty))]:
index = self._find(subnode.get_name())
if index is None:
dup = deepcopy(subnode)
if isinstance(subnode, FdtNode):
dup.set_parent_node(self)
self.append(dup)
elif isinstance(subnode, FdtNode):
self.subdata[index].merge(subnode)
else:
self.subdata[index] = copy(subnode)
def walk(self):
"""Walk into subnodes and yield paths and objects
Returns set with (path string, node object)
"""
node = self
start = 0
hist = []
curpath = []
while True:
for index in range(start, len(node)):
if isinstance(node[index], (FdtNode, FdtProperty)):
yield ('/' + '/'.join(curpath+[node[index].get_name()]),
node[index])
if isinstance(node[index], FdtNode):
if len(node[index]):
hist.append((node, index+1))
curpath.append(node[index].get_name())
node = node[index]
start = 0
index = -1
break
if index >= 0:
if len(hist):
(node, start) = hist.pop()
curpath.pop()
else:
break
class Fdt(object):
"""Flattened Device Tree representation"""
def __init__(self, version=17, last_comp_version=16, boot_cpuid_phys=0):
"""Init FDT object with version and boot values"""
self.header = {'magic': FDT_MAGIC,
'totalsize': 0,
'off_dt_struct': 0,
'off_dt_strings': 0,
'off_mem_rsvmap': 0,
'version': version,
'last_comp_version': last_comp_version,
'boot_cpuid_phys': boot_cpuid_phys,
'size_dt_strings': 0,
'size_dt_struct': 0}
self.rootnode = None
self.prenops = None
self.postnops = None
self.reserve_entries = None
def add_rootnode(self, rootnode, prenops=None, postnops=None):
"""Add root node"""
self.rootnode = rootnode
self.prenops = prenops
self.postnops = postnops
def get_rootnode(self):
"""Get root node"""
return self.rootnode
def add_reserve_entries(self, reserve_entries):
"""Add reserved entries as list of dict with
'address' and 'size' keys"""
self.reserve_entries = reserve_entries
def to_dts(self):
"""Export to DTS representation in string format"""
result = "/dts-v1/;\n"
result += "// version:\t\t%d\n" % self.header['version']
result += "// last_comp_version:\t%d\n" % \
self.header['last_comp_version']
if self.header['version'] >= 2:
result += "// boot_cpuid_phys:\t0x%x\n" % \
self.header['boot_cpuid_phys']
result += '\n'
if self.reserve_entries is not None:
for entry in self.reserve_entries:
result += "/memreserve/ "
if entry['address']:
result += "%#x " % entry['address']
else:
result += "0 "
if entry['size']:
result += "%#x" % entry['size']
else:
result += "0"
result += ";\n"
if self.prenops:
result += '\n'.join([nop.dts_represent() for nop in self.prenops])
result += '\n'
if self.rootnode is not None:
result += self.rootnode.dts_represent()
if self.postnops:
result += '\n'
result += '\n'.join([nop.dts_represent() for nop in self.postnops])
return result
def to_dtb(self):
"""Export to Blob format"""
if self.rootnode is None:
return None
blob_reserve_entries = pack('')
if self.reserve_entries is not None:
for entry in self.reserve_entries:
blob_reserve_entries += pack('>QQ',
entry['address'],
entry['size'])
blob_reserve_entries += pack('>QQ', 0, 0)
header_size = 7 * 4
if self.header['version'] >= 2:
header_size += 4
if self.header['version'] >= 3:
header_size += 4
if self.header['version'] >= 17:
header_size += 4
header_adjust = pack('')
if header_size % 8 != 0:
header_adjust = pack('b', 0) * (8 - (header_size % 8))
header_size += len(header_adjust)
dt_start = header_size + len(blob_reserve_entries)
# print "dt_start %d" % dt_start
(blob_dt, blob_strings, dt_pos) = \
self.rootnode.dtb_represent('', dt_start, self.header['version'])
if self.prenops is not None:
blob_dt = pack('').join([nop.dtb_represent('')[0]
for nop in self.prenops])\
+ blob_dt
if self.postnops is not None:
blob_dt += pack('').join([nop.dtb_represent('')[0]
for nop in self.postnops])
blob_dt += pack('>I', FDT_END)
self.header['size_dt_strings'] = len(blob_strings)
self.header['size_dt_struct'] = len(blob_dt)
self.header['off_mem_rsvmap'] = header_size
self.header['off_dt_struct'] = dt_start
self.header['off_dt_strings'] = dt_start + len(blob_dt)
self.header['totalsize'] = dt_start + len(blob_dt) + len(blob_strings)
blob_header = pack('>IIIIIII', self.header['magic'],
self.header['totalsize'],
self.header['off_dt_struct'],
self.header['off_dt_strings'],
self.header['off_mem_rsvmap'],
self.header['version'],
self.header['last_comp_version'])
if self.header['version'] >= 2:
blob_header += pack('>I', self.header['boot_cpuid_phys'])
if self.header['version'] >= 3:
blob_header += pack('>I', self.header['size_dt_strings'])
if self.header['version'] >= 17:
blob_header += pack('>I', self.header['size_dt_struct'])
return blob_header + header_adjust + blob_reserve_entries + \
blob_dt + blob_strings.encode('ascii')
def to_json(self):
"""Ouput JSON"""
if self.rootnode is None:
return None
return self.rootnode.json_represent()
def resolve_path(self, path):
"""Resolve path like /memory/reg and return either a FdtNode,
a FdtProperty or None"""
if self.rootnode is None:
return None
if not path.startswith('/'):
return None
if len(path) > 1 and path.endswith('/'):
path = path[:-1]
if path == '/':
return self.rootnode
curnode = self.rootnode
for subpath in path[1:].split('/'):
found = None
if not isinstance(curnode, FdtNode):
return None
for node in curnode:
if subpath == node.get_name():
found = node
break
if found is None:
return None
curnode = found
return curnode
def _add_json_to_fdtnode(node, subjson):
"""Populate FdtNode with JSON dict items"""
for (key, value) in subjson.items():
if isinstance(value, dict):
subnode = FdtNode(key)
subnode.set_parent_node(node)
node.append(subnode)
_add_json_to_fdtnode(subnode, value)
elif isinstance(value, list):
if len(value) < 2:
raise Exception("Invalid list for %s" % key)
if value[0] == "words":
words = [int(word, 16) for word in value[1:]]
node.append(FdtPropertyWords(key, words))
elif value[0] == "bytes":
bytez = [int(byte, 16) for byte in value[1:]]
node.append(FdtPropertyBytes(key, bytez))
elif value[0] == "strings":
node.append(FdtPropertyStrings(key, \
[s for s in value[1:]]))
else:
raise Exception("Invalid list for %s" % key)
elif value is None:
node.append(FdtProperty(key))
else:
raise Exception("Invalid value for %s" % key)
def FdtJsonParse(buf):
"""Import FDT from JSON representation, see JSONDeviceTree.md for
structure and encoding
Returns an Fdt object
"""
tree = json.loads(buf)
root = FdtNode('/')
_add_json_to_fdtnode(root, tree)
fdt = Fdt()
fdt.add_rootnode(root)
return fdt
def FdtFsParse(path):
"""Parse device tree filesystem and return a Fdt instance
Should be /proc/device-tree on a device, or the fusemount.py
mount point.
"""
root = FdtNode("/")
if path.endswith('/'):
path = path[:-1]
nodes = {path: root}
for subpath, subdirs, files in os.walk(path):
if subpath not in nodes.keys():
raise Exception("os.walk error")
cur = nodes[subpath]
for f in files:
with open(subpath+'/'+f, 'rb') as content_file:
content = content_file.read()
prop = FdtProperty.new_raw_property(f, content)
cur.add_subnode(prop)
for subdir in subdirs:
subnode = FdtNode(subdir)
cur.add_subnode(subnode)
subnode.set_parent_node(cur)
nodes[subpath+'/'+subdir] = subnode
fdt = Fdt()
fdt.add_rootnode(root)
return fdt
class FdtBlobParse(object): # pylint: disable-msg=R0903
"""Parse from file input"""
__fdt_header_format = ">IIIIIII"
__fdt_header_names = ('magic', 'totalsize', 'off_dt_struct',
'off_dt_strings', 'off_mem_rsvmap', 'version',
'last_comp_version')
__fdt_reserve_entry_format = ">QQ"
__fdt_reserve_entry_names = ('address', 'size')
__fdt_dt_cell_format = ">I"
__fdt_dt_prop_format = ">II"
__fdt_dt_tag_name = {FDT_BEGIN_NODE: 'node_begin',
FDT_END_NODE: 'node_end',
FDT_PROP: 'prop',
FDT_NOP: 'nop',
FDT_END: 'end'}
def __extract_fdt_header(self):
"""Extract DTB header"""
header = Struct(self.__fdt_header_format)
header_entry = Struct(">I")
data = self.infile.read(header.size)
result = dict(zip(self.__fdt_header_names, header.unpack_from(data)))
if result['version'] >= 2:
data = self.infile.read(header_entry.size)
result['boot_cpuid_phys'] = header_entry.unpack_from(data)[0]
if result['version'] >= 3:
data = self.infile.read(header_entry.size)
result['size_dt_strings'] = header_entry.unpack_from(data)[0]
if result['version'] >= 17:
data = self.infile.read(header_entry.size)
result['size_dt_struct'] = header_entry.unpack_from(data)[0]
return result
def __extract_fdt_reserve_entries(self):
"""Extract reserved memory entries"""
header = Struct(self.__fdt_reserve_entry_format)
entries = []
self.infile.seek(self.fdt_header['off_mem_rsvmap'])
while True:
data = self.infile.read(header.size)
result = dict(zip(self.__fdt_reserve_entry_names,
header.unpack_from(data)))
if result['address'] == 0 and result['size'] == 0:
return entries
entries.append(result)
def __extract_fdt_nodename(self):
"""Extract node name"""
data = ''
pos = self.infile.tell()
while True:
byte = self.infile.read(1)
if ord(byte) == 0:
break
data += byte.decode('ascii')
align_pos = pos + len(data) + 1
align_pos = (((align_pos) + ((4) - 1)) & ~((4) - 1))
self.infile.seek(align_pos)
return data
def __extract_fdt_string(self, prop_string_pos):
"""Extract string from string pool"""
data = ''
pos = self.infile.tell()
self.infile.seek(self.fdt_header['off_dt_strings']+prop_string_pos)
while True:
byte = self.infile.read(1)
if ord(byte) == 0:
break
data += byte.decode('ascii')
self.infile.seek(pos)
return data
def __extract_fdt_prop(self):
"""Extract property"""
prop = Struct(self.__fdt_dt_prop_format)
pos = self.infile.tell()
data = self.infile.read(prop.size)
(prop_size, prop_string_pos,) = prop.unpack_from(data)
prop_start = pos + prop.size
if self.fdt_header['version'] < 16 and prop_size >= 8:
prop_start = (((prop_start) + ((8) - 1)) & ~((8) - 1))
self.infile.seek(prop_start)
value = self.infile.read(prop_size)
align_pos = self.infile.tell()
align_pos = (((align_pos) + ((4) - 1)) & ~((4) - 1))
self.infile.seek(align_pos)
return (self.__extract_fdt_string(prop_string_pos), value)
def __extract_fdt_dt(self):
"""Extract tags"""
cell = Struct(self.__fdt_dt_cell_format)
tags = []
self.infile.seek(self.fdt_header['off_dt_struct'])
while True:
data = self.infile.read(cell.size)
if len(data) < cell.size:
break
tag, = cell.unpack_from(data)
# print "*** %s" % self.__fdt_dt_tag_name.get(tag, '')
if self.__fdt_dt_tag_name.get(tag, '') in 'node_begin':
name = self.__extract_fdt_nodename()
if len(name) == 0:
name = '/'
tags.append((tag, name))
elif self.__fdt_dt_tag_name.get(tag, '') in ('node_end', 'nop'):
tags.append((tag, ''))
elif self.__fdt_dt_tag_name.get(tag, '') in 'end':
tags.append((tag, ''))
break
elif self.__fdt_dt_tag_name.get(tag, '') in 'prop':
propdata = self.__extract_fdt_prop()
tags.append((tag, propdata))
else:
print("Unknown Tag %d" % tag)
return tags
def __init__(self, infile):
"""Init with file input"""
self.infile = infile
self.fdt_header = self.__extract_fdt_header()
if self.fdt_header['magic'] != FDT_MAGIC:
raise Exception('Invalid Magic')
if self.fdt_header['version'] > FDT_MAX_VERSION:
raise Exception('Invalid Version %d' % self.fdt_header['version'])
if self.fdt_header['last_comp_version'] > FDT_MAX_VERSION-1:
raise Exception('Invalid last compatible Version %d' %
self.fdt_header['last_comp_version'])
self.fdt_reserve_entries = self.__extract_fdt_reserve_entries()
self.fdt_dt_tags = self.__extract_fdt_dt()
def __to_nodes(self):
"""Represent fdt as Node and properties structure
Returns a set with the pre-node Nops, the Root Node,
and the post-node Nops.
"""
prenops = []
postnops = []
rootnode = None
curnode = None
for tag in self.fdt_dt_tags:
if self.__fdt_dt_tag_name.get(tag[0], '') in 'node_begin':
newnode = FdtNode(tag[1])
if rootnode is None:
rootnode = newnode
if curnode is not None:
curnode.add_subnode(newnode)
newnode.set_parent_node(curnode)
curnode = newnode
elif self.__fdt_dt_tag_name.get(tag[0], '') in 'node_end':
if curnode is not None:
curnode = curnode.get_parent_node()
elif self.__fdt_dt_tag_name.get(tag[0], '') in 'nop':
if curnode is not None:
curnode.add_subnode(FdtNop())
elif rootnode is not None:
postnops.append(FdtNop())
else:
prenops.append(FdtNop())
elif self.__fdt_dt_tag_name.get(tag[0], '') in 'prop':
if curnode is not None:
curnode.add_raw_attribute(tag[1][0], tag[1][1])
elif self.__fdt_dt_tag_name.get(tag[0], '') in 'end':
continue
return (prenops, rootnode, postnops)
def to_fdt(self):
"""Create a fdt object
Returns a Fdt object
"""
if self.fdt_header['version'] >= 2:
boot_cpuid_phys = self.fdt_header['boot_cpuid_phys']
else:
boot_cpuid_phys = 0
fdt = Fdt(version=self.fdt_header['version'],
last_comp_version=self.fdt_header['last_comp_version'],
boot_cpuid_phys=boot_cpuid_phys)
(prenops, rootnode, postnops) = self.__to_nodes()
fdt.add_rootnode(rootnode, prenops=prenops, postnops=postnops)
fdt.add_reserve_entries(self.fdt_reserve_entries)
return fdt