Source code for plistrc.schema_extractor
"""Property list file schema extractor."""
import datetime
import logging
import os
import plistlib
import xml
from artifacts import definitions as artifacts_definitions
from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry
from dfdatetime import cocoa_time as dfdatetime_cocoa_time
from dfimagetools import definitions as dfimagetools_definitions
from dfimagetools import file_entry_lister
from plistrc import decoders
from plistrc import resources
from plistrc import yaml_definitions_file
[docs]
class PropertyListSchemaExtractor:
"""Property list file schema extractor."""
_COMPOSITE_VALUE_TYPES = frozenset(["array", "dict"])
_MAXIMUM_FILE_SIZE = 64 * 1024 * 1024
_MINIMUM_FILE_SIZE = 8
_PROPERTY_LIST_DEFINITIONS_FILE = os.path.join(
os.path.dirname(__file__), "data", "known_property_lists.yaml"
)
_UTF8_BYTE_ORDER_MARK = b"\xef\xbb\xbf"
_UTF16BE_BYTE_ORDER_MARK = b"\xfe\xff"
_UTF16LE_BYTE_ORDER_MARK = b"\xff\xfe"
_UTF32BE_BYTE_ORDER_MARK = b"\x00\x00\xfe\xff"
_UTF32LE_BYTE_ORDER_MARK = b"\xff\xfe\x00\x00"
[docs]
def __init__(self, artifact_definitions, mediator=None):
"""Initializes a property list file schema extractor.
Args:
artifact_definitions (str): path to a single artifact definitions
YAML file or a directory of definitions YAML files.
mediator (Optional[dfvfs.VolumeScannerMediator]): a volume scanner
mediator.
"""
super().__init__()
self._artifacts_registry = artifacts_registry.ArtifactDefinitionsRegistry()
self._known_property_list_definitions = {}
self._mediator = mediator
self._nskeyedarchiver_decoder = decoders.NSKeyedArchiverDecoder()
if artifact_definitions:
reader = artifacts_reader.YamlArtifactsReader()
if os.path.isdir(artifact_definitions):
self._artifacts_registry.ReadFromDirectory(reader, artifact_definitions)
elif os.path.isfile(artifact_definitions):
self._artifacts_registry.ReadFromFile(reader, artifact_definitions)
definitions_file = yaml_definitions_file.YAMLPropertyListDefinitionsFile()
for property_list_definition in definitions_file.ReadFromFile(
self._PROPERTY_LIST_DEFINITIONS_FILE
):
artifact_definition = self._artifacts_registry.GetDefinitionByName(
property_list_definition.artifact_definition
)
if not artifact_definition:
logging.warning(
f"Unknown artifact definition: "
f"{property_list_definition.artifact_definition:s}"
)
else:
self._known_property_list_definitions[
property_list_definition.property_list_identifier
] = artifact_definition
def _CheckByteOrderMark(self, data):
"""Determines if a property list starts with a byte-order-mark.
Args:
data (bytes): data.
Returns:
tuple[int, str]: size of the byte-order-mark or 0 if no byte-order-mark
was detected and encoding.
"""
if data.startswith(self._UTF32BE_BYTE_ORDER_MARK):
return 4, "utf-32-be"
if data.startswith(self._UTF32LE_BYTE_ORDER_MARK):
return 4, "utf-32-le"
if data.startswith(self._UTF16BE_BYTE_ORDER_MARK):
return 2, "utf-16-be"
if data.startswith(self._UTF16LE_BYTE_ORDER_MARK):
return 2, "utf-16-le"
if data.startswith(self._UTF8_BYTE_ORDER_MARK):
return 3, "utf-8"
return 0, "ascii"
def _CheckSignature(self, file_object):
"""Checks the signature of a given file-like object.
Args:
file_object (dfvfs.FileIO): file-like object of the property list.
Returns:
bool: True if the signature matches that of a property list, False
otherwise.
"""
if not file_object:
return False
file_object.seek(0, os.SEEK_SET)
file_data = file_object.read()
if file_data.startswith(b"bplist0"):
return True
byte_order_mark_size, encoding = self._CheckByteOrderMark(file_data)
xml_signature = "<?xml ".encode(encoding)
is_xml = file_data[byte_order_mark_size:].startswith(xml_signature)
if not is_xml:
# Preserve the byte-order-mark for plistlib.
file_data = b"".join(
[
file_data[:byte_order_mark_size],
file_data[byte_order_mark_size:].lstrip(),
]
)
is_xml = file_data[byte_order_mark_size:].startswith(xml_signature)
if is_xml:
logging.info("XML plist file with leading whitespace")
if is_xml:
plist_footer = "</plist>".encode(encoding)
file_data = file_data.rstrip()
if not file_data.endswith(plist_footer):
return False
return is_xml
def _FormatSchemaAsYAML(self, schema):
"""Formats a schema into YAML.
Args:
schema (PropertyDefinition): schema.
Returns:
str: schema formatted as YAML.
"""
tables = []
for property_definition in self._GetDictPropertyDefinitions(schema):
if not property_definition.schema:
continue
name = property_definition.key_path or "."
table = [f"table: {name:s}", "columns:"]
for value_property_definition in sorted(
property_definition.schema, key=lambda definition: definition.name
):
table.append(f"- name: {value_property_definition.name:s}")
if value_property_definition.value_type != "array":
value_type = value_property_definition.value_type
else:
array_value_types = ",".join(
sorted(
{
definition.value_type
for definition in value_property_definition.schema
}
)
)
value_type = f"array[{array_value_types:s}]"
table.append(f" value_type: {value_type:s}")
if table not in tables:
tables.append(table)
lines = ["# PList-kb property list schema.", "---"]
for table in sorted(tables):
lines.extend(table)
lines.append("---")
return "\n".join(lines)
def _GetDictPropertyDefinitions(self, property_definition):
"""Retrieves the dictionary property definitions.
Yields:
PropertyDefinition: dict property definition.
"""
if property_definition.value_type == "dict":
yield property_definition
for value_property_definition in property_definition.schema:
if value_property_definition.value_type in self._COMPOSITE_VALUE_TYPES:
yield from self._GetDictPropertyDefinitions(value_property_definition)
def _GetPropertyListIdentifier(self, path_segments):
"""Determines the property list identifier.
Args:
path_segments (list[str]): path segments.
Returns:
str: property list identifier or None if the type could not be determined.
"""
# TODO: make comparison more efficient.
for (
property_list_identifier,
artifact_definition,
) in self._known_property_list_definitions.items():
for source in artifact_definition.sources:
if source.type_indicator in (
artifacts_definitions.TYPE_INDICATOR_DIRECTORY,
artifacts_definitions.TYPE_INDICATOR_FILE,
artifacts_definitions.TYPE_INDICATOR_PATH,
):
for source_path in set(source.paths):
source_path_segments = source_path.split(source.separator)
if not source_path_segments[0]:
source_path_segments = source_path_segments[1:]
# TODO: add support for parameters.
last_index = len(source_path_segments)
for index in range(1, last_index + 1):
source_path_segment = source_path_segments[-index]
if not source_path_segment or len(source_path_segment) < 2:
continue
if (
source_path_segment[0] == "%"
and source_path_segment[-1] == "%"
):
source_path_segments = source_path_segments[
-index + 1 :
]
break
if len(source_path_segments) > len(path_segments):
continue
is_match = True
last_index = min(len(source_path_segments), len(path_segments))
for index in range(1, last_index + 1):
source_path_segment = source_path_segments[-index]
# TODO: improve handling of *
if "*" in source_path_segment:
continue
path_segment = path_segments[-index].lower()
source_path_segment = source_path_segment.lower()
is_match = path_segment == source_path_segment
if not is_match:
break
if is_match:
return property_list_identifier
return None
def _GetPropertyListKeyPath(self, key_path_segments):
"""Retrieves a property list key path.
Args:
key_path_segments (list[str]): property list key path segments.
Returns:
str: property list key path.
"""
# TODO: escape '.' in path segments
return ".".join(key_path_segments)
def _GetPropertyListSchemaFromItem(self, item, key_path_segments):
"""Retrieves schema from given property list item.
Args:
item (object): property list item.
key_path_segments (list[str]): property list key path segments.
Returns:
PropertyDefinition: property definition of the item.
Raises:
RuntimeError: if the item is not supported.
"""
property_definition = resources.PropertyDefinition()
property_definition.key_path = self._GetPropertyListKeyPath(key_path_segments)
property_definition.value_type = self._GetPropertyListValueType(item)
if isinstance(item, dict):
for key, value in item.items():
value_type = self._GetPropertyListValueType(item)
if value_type not in self._COMPOSITE_VALUE_TYPES:
value_property_definition = resources.PropertyDefinition()
value_property_definition.name = key
value_property_definition.value_type = value_type
else:
value_key_path_segments = list(key_path_segments)
value_key_path_segments.append(key)
value_property_definition = self._GetPropertyListSchemaFromItem(
value, value_key_path_segments
)
value_property_definition.name = key
property_definition.schema.append(value_property_definition)
elif isinstance(item, list):
for value in item:
value_type = self._GetPropertyListValueType(item)
if value_type not in self._COMPOSITE_VALUE_TYPES:
value_property_definition = resources.PropertyDefinition()
value_property_definition.value_type = value_type
else:
value_property_definition = self._GetPropertyListSchemaFromItem(
value, key_path_segments
)
property_definition.schema.append(value_property_definition)
return property_definition
def _GetPropertyListValueType(self, item):
"""Retrieves property list value type.
Args:
item (object): property list item.
Yields:
str: value type.
Raises:
RuntimeError: if the value type is not supported.
"""
if item is None:
return "null"
if isinstance(item, bytes):
return "data"
if isinstance(item, dict):
return "dict"
if isinstance(item, float):
return "real"
if isinstance(item, int):
return "int"
if isinstance(item, list):
return "array"
if isinstance(item, str):
return "string"
if isinstance(item, plistlib.UID):
return "UID"
if isinstance(item, (datetime.datetime, dfdatetime_cocoa_time.CocoaTime)):
return "date"
value_type = type(item)
raise RuntimeError(f"Unsupported value type: {value_type!s}")
[docs]
def GetDisplayPath(self, path_segments, data_stream_name=None):
"""Retrieves a path to display.
Args:
path_segments (list[str]): path segments of the full path of the file
entry.
data_stream_name (Optional[str]): name of the data stream.
Returns:
str: path to display.
"""
display_path = ""
path_segments = [
segment.translate(
dfimagetools_definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE
)
for segment in path_segments
]
display_path = "".join([display_path, "/".join(path_segments)])
if data_stream_name:
data_stream_name = data_stream_name.translate(
dfimagetools_definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE
)
display_path = ":".join([display_path, data_stream_name])
return display_path or "/"
[docs]
def ExtractSchemas(self, path, options=None):
"""Extracts property list schemas from the path.
Args:
path (str): path of a property list file or storage media image containing
property list files.
options (Optional[dfvfs.VolumeScannerOptions]): volume scanner options. If
None the default volume scanner options are used, which are defined in
the dfVFS VolumeScannerOptions class.
Yields:
tuple[str, dict[str, str]]: known property list type identifier or the
name of the property list file if not known and schema.
"""
entry_lister = file_entry_lister.FileEntryLister(mediator=self._mediator)
base_path_specs = entry_lister.GetBasePathSpecs(path, options=options)
if not base_path_specs:
logging.warning(
f"Unable to determine base path specifications from: {path:s}"
)
else:
for file_entry, path_segments in entry_lister.ListFileEntries(
base_path_specs
):
if (
not file_entry.IsFile()
or file_entry.size < self._MINIMUM_FILE_SIZE
or file_entry.size > self._MAXIMUM_FILE_SIZE
):
continue
file_object = file_entry.GetFileObject()
if not self._CheckSignature(file_object):
continue
display_path = self.GetDisplayPath(path_segments)
# Skip Cocoa nib files for now https://developer.apple.com/library/
# archive/documentation/Cocoa/Conceptual/LoadingResources/CocoaNibs/
# CocoaNibs.html
if path_segments[-1].endswith(".nib"):
# logging.info(f'Skipping nib plist file: {display_path:s}')
continue
# logging.info(f'Extracting schema from plist file: {display_path:s}')
# Note that plistlib assumes the file-like object current offset is at
# the start of the property list.
file_object.seek(0, os.SEEK_SET)
try:
root_item = plistlib.load(file_object)
except plistlib.InvalidFileException:
logging.error(f"Invalid property list file: {display_path:s}")
except xml.parsers.expat.ExpatError:
logging.error(f"Corrupt XML property list file: {display_path:s}")
if self._nskeyedarchiver_decoder.IsEncoded(root_item):
try:
root_item = self._nskeyedarchiver_decoder.Decode(root_item)
except RuntimeError as exception:
logging.error(
f"Unable to decode property list file: {display_path:s} "
f"with error: {exception!s}"
)
plist_schema = self._GetPropertyListSchemaFromItem(root_item, [""])
if plist_schema is None:
logging.warning(
f"Unable to determine schema from plist file: {display_path:s}"
)
continue
property_list_identifier = self._GetPropertyListIdentifier(
path_segments
)
if not property_list_identifier:
logging.warning(
f"Unable to determine known property list identifier of file: "
f"{display_path:s}"
)
property_list_identifier = path_segments[-1]
yield property_list_identifier, plist_schema
[docs]
def FormatSchema(self, schema, output_format):
"""Formats a schema into the output format.
Args:
schema (PropertyDefinition): schema.
output_format (str): output format.
Returns:
str: formatted schema.
Raises:
RuntimeError: if a query could not be parsed.
"""
if output_format == "yaml":
return self._FormatSchemaAsYAML(schema)
raise RuntimeError(f"Unsupported output format: {output_format:s}")