#!/usr/bin/env python3 # # Copyright The Mbed TLS Contributors # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later """Audit validity date of X509 crt/crl/csr. This script is used to audit the validity date of crt/crl/csr used for testing. It prints the information about X.509 objects excluding the objects that are valid throughout the desired validity period. The data are collected from tests/data_files/ and tests/suites/*.data files by default. """ import os import re import typing import argparse import datetime import glob import logging import hashlib from enum import Enum # The script requires cryptography >= 35.0.0 which is only available # for Python >= 3.6. import cryptography from cryptography import x509 from generate_test_code import FileWrapper import scripts_path # pylint: disable=unused-import from mbedtls_dev import build_tree from mbedtls_dev import logging_util def check_cryptography_version(): match = re.match(r'^[0-9]+', cryptography.__version__) if match is None or int(match.group(0)) < 35: raise Exception("audit-validity-dates requires cryptography >= 35.0.0" + "({} is too old)".format(cryptography.__version__)) class DataType(Enum): CRT = 1 # Certificate CRL = 2 # Certificate Revocation List CSR = 3 # Certificate Signing Request class DataFormat(Enum): PEM = 1 # Privacy-Enhanced Mail DER = 2 # Distinguished Encoding Rules class AuditData: """Store data location, type and validity period of X.509 objects.""" #pylint: disable=too-few-public-methods def __init__(self, data_type: DataType, x509_obj): self.data_type = data_type # the locations that the x509 object could be found self.locations = [] # type: typing.List[str] self.fill_validity_duration(x509_obj) self._obj = x509_obj encoding = cryptography.hazmat.primitives.serialization.Encoding.DER self._identifier = hashlib.sha1(self._obj.public_bytes(encoding)).hexdigest() @property def identifier(self): """ Identifier of the underlying X.509 object, which is consistent across different runs. """ return self._identifier def fill_validity_duration(self, x509_obj): """Read validity period from an X.509 object.""" # Certificate expires after "not_valid_after" # Certificate is invalid before "not_valid_before" if self.data_type == DataType.CRT: self.not_valid_after = x509_obj.not_valid_after self.not_valid_before = x509_obj.not_valid_before # CertificateRevocationList expires after "next_update" # CertificateRevocationList is invalid before "last_update" elif self.data_type == DataType.CRL: self.not_valid_after = x509_obj.next_update self.not_valid_before = x509_obj.last_update # CertificateSigningRequest is always valid. elif self.data_type == DataType.CSR: self.not_valid_after = datetime.datetime.max self.not_valid_before = datetime.datetime.min else: raise ValueError("Unsupported file_type: {}".format(self.data_type)) class X509Parser: """A parser class to parse crt/crl/csr file or data in PEM/DER format.""" PEM_REGEX = br'-{5}BEGIN (?P.*?)-{5}(?P.*?)-{5}END (?P=type)-{5}' PEM_TAG_REGEX = br'-{5}BEGIN (?P.*?)-{5}\n' PEM_TAGS = { DataType.CRT: 'CERTIFICATE', DataType.CRL: 'X509 CRL', DataType.CSR: 'CERTIFICATE REQUEST' } def __init__(self, backends: typing.Dict[DataType, typing.Dict[DataFormat, typing.Callable[[bytes], object]]]) \ -> None: self.backends = backends self.__generate_parsers() def __generate_parser(self, data_type: DataType): """Parser generator for a specific DataType""" tag = self.PEM_TAGS[data_type] pem_loader = self.backends[data_type][DataFormat.PEM] der_loader = self.backends[data_type][DataFormat.DER] def wrapper(data: bytes): pem_type = X509Parser.pem_data_type(data) # It is in PEM format with target tag if pem_type == tag: return pem_loader(data) # It is in PEM format without target tag if pem_type: return None # It might be in DER format try: result = der_loader(data) except ValueError: result = None return result wrapper.__name__ = "{}.parser[{}]".format(type(self).__name__, tag) return wrapper def __generate_parsers(self): """Generate parsers for all support DataType""" self.parsers = {} for data_type, _ in self.PEM_TAGS.items(): self.parsers[data_type] = self.__generate_parser(data_type) def __getitem__(self, item): return self.parsers[item] @staticmethod def pem_data_type(data: bytes) -> typing.Optional[str]: """Get the tag from the data in PEM format :param data: data to be checked in binary mode. :return: PEM tag or "" when no tag detected. """ m = re.search(X509Parser.PEM_TAG_REGEX, data) if m is not None: return m.group('type').decode('UTF-8') else: return None @staticmethod def check_hex_string(hex_str: str) -> bool: """Check if the hex string is possibly DER data.""" hex_len = len(hex_str) # At least 6 hex char for 3 bytes: Type + Length + Content if hex_len < 6: return False # Check if Type (1 byte) is SEQUENCE. if hex_str[0:2] != '30': return False # Check LENGTH (1 byte) value content_len = int(hex_str[2:4], base=16) consumed = 4 if content_len in (128, 255): # Indefinite or Reserved return False elif content_len > 127: # Definite, Long length_len = (content_len - 128) * 2 content_len = int(hex_str[consumed:consumed+length_len], base=16) consumed += length_len # Check LENGTH if hex_len != content_len * 2 + consumed: return False return True class Auditor: """ A base class that uses X509Parser to parse files to a list of AuditData. A subclass must implement the following methods: - collect_default_files: Return a list of file names that are defaultly used for parsing (auditing). The list will be stored in Auditor.default_files. - parse_file: Method that parses a single file to a list of AuditData. A subclass may override the following methods: - parse_bytes: Defaultly, it parses `bytes` that contains only one valid X.509 data(DER/PEM format) to an X.509 object. - walk_all: Defaultly, it iterates over all the files in the provided file name list, calls `parse_file` for each file and stores the results by extending the `results` passed to the function. """ def __init__(self, logger): self.logger = logger self.default_files = self.collect_default_files() self.parser = X509Parser({ DataType.CRT: { DataFormat.PEM: x509.load_pem_x509_certificate, DataFormat.DER: x509.load_der_x509_certificate }, DataType.CRL: { DataFormat.PEM: x509.load_pem_x509_crl, DataFormat.DER: x509.load_der_x509_crl }, DataType.CSR: { DataFormat.PEM: x509.load_pem_x509_csr, DataFormat.DER: x509.load_der_x509_csr }, }) def collect_default_files(self) -> typing.List[str]: """Collect the default files for parsing.""" raise NotImplementedError def parse_file(self, filename: str) -> typing.List[AuditData]: """ Parse a list of AuditData from file. :param filename: name of the file to parse. :return list of AuditData parsed from the file. """ raise NotImplementedError def parse_bytes(self, data: bytes): """Parse AuditData from bytes.""" for data_type in list(DataType): try: result = self.parser[data_type](data) except ValueError as val_error: result = None self.logger.warning(val_error) if result is not None: audit_data = AuditData(data_type, result) return audit_data return None def walk_all(self, results: typing.Dict[str, AuditData], file_list: typing.Optional[typing.List[str]] = None) \ -> None: """ Iterate over all the files in the list and get audit data. The results will be written to `results` passed to this function. :param results: The dictionary used to store the parsed AuditData. The keys of this dictionary should be the identifier of the AuditData. """ if file_list is None: file_list = self.default_files for filename in file_list: data_list = self.parse_file(filename) for d in data_list: if d.identifier in results: results[d.identifier].locations.extend(d.locations) else: results[d.identifier] = d @staticmethod def find_test_dir(): """Get the relative path for the Mbed TLS test directory.""" return os.path.relpath(build_tree.guess_mbedtls_root() + '/tests') class TestDataAuditor(Auditor): """Class for auditing files in `tests/data_files/`""" def collect_default_files(self): """Collect all files in `tests/data_files/`""" test_dir = self.find_test_dir() test_data_glob = os.path.join(test_dir, 'data_files/**') data_files = [f for f in glob.glob(test_data_glob, recursive=True) if os.path.isfile(f)] return data_files def parse_file(self, filename: str) -> typing.List[AuditData]: """ Parse a list of AuditData from data file. :param filename: name of the file to parse. :return list of AuditData parsed from the file. """ with open(filename, 'rb') as f: data = f.read() results = [] # Try to parse all PEM blocks. is_pem = False for idx, m in enumerate(re.finditer(X509Parser.PEM_REGEX, data, flags=re.S), 1): is_pem = True result = self.parse_bytes(data[m.start():m.end()]) if result is not None: result.locations.append("{}#{}".format(filename, idx)) results.append(result) # Might be DER format. if not is_pem: result = self.parse_bytes(data) if result is not None: result.locations.append("{}".format(filename)) results.append(result) return results def parse_suite_data(data_f): """ Parses .data file for test arguments that possiblly have a valid X.509 data. If you need a more precise parser, please use generate_test_code.parse_test_data instead. :param data_f: file object of the data file. :return: Generator that yields test function argument list. """ for line in data_f: line = line.strip() # Skip comments if line.startswith('#'): continue # Check parameters line match = re.search(r'\A\w+(.*:)?\"', line) if match: # Read test vectors parts = re.split(r'(?[0-9a-fA-F]+)"', test_arg) if not match: continue if not X509Parser.check_hex_string(match.group('data')): continue audit_data = self.parse_bytes(bytes.fromhex(match.group('data'))) if audit_data is None: continue audit_data.locations.append("{}:{}:#{}".format(filename, data_f.line_no, idx + 1)) audit_data_list.append(audit_data) return audit_data_list def list_all(audit_data: AuditData): for loc in audit_data.locations: print("{}\t{:20}\t{:20}\t{:3}\t{}".format( audit_data.identifier, audit_data.not_valid_before.isoformat(timespec='seconds'), audit_data.not_valid_after.isoformat(timespec='seconds'), audit_data.data_type.name, loc)) def main(): """ Perform argument parsing. """ parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('-a', '--all', action='store_true', help='list the information of all the files') parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', help='show logs') parser.add_argument('--from', dest='start_date', help=('Start of desired validity period (UTC, YYYY-MM-DD). ' 'Default: today'), metavar='DATE') parser.add_argument('--to', dest='end_date', help=('End of desired validity period (UTC, YYYY-MM-DD). ' 'Default: --from'), metavar='DATE') parser.add_argument('--data-files', action='append', nargs='*', help='data files to audit', metavar='FILE') parser.add_argument('--suite-data-files', action='append', nargs='*', help='suite data files to audit', metavar='FILE') args = parser.parse_args() # start main routine # setup logger logger = logging.getLogger() logging_util.configure_logger(logger) logger.setLevel(logging.DEBUG if args.verbose else logging.ERROR) td_auditor = TestDataAuditor(logger) sd_auditor = SuiteDataAuditor(logger) data_files = [] suite_data_files = [] if args.data_files is None and args.suite_data_files is None: data_files = td_auditor.default_files suite_data_files = sd_auditor.default_files else: if args.data_files is not None: data_files = [x for l in args.data_files for x in l] if args.suite_data_files is not None: suite_data_files = [x for l in args.suite_data_files for x in l] # validity period start date if args.start_date: start_date = datetime.datetime.fromisoformat(args.start_date) else: start_date = datetime.datetime.today() # validity period end date if args.end_date: end_date = datetime.datetime.fromisoformat(args.end_date) else: end_date = start_date # go through all the files audit_results = {} td_auditor.walk_all(audit_results, data_files) sd_auditor.walk_all(audit_results, suite_data_files) logger.info("Total: {} objects found!".format(len(audit_results))) # we filter out the files whose validity duration covers the provided # duration. filter_func = lambda d: (start_date < d.not_valid_before) or \ (d.not_valid_after < end_date) sortby_end = lambda d: d.not_valid_after if args.all: filter_func = None # filter and output the results for d in sorted(filter(filter_func, audit_results.values()), key=sortby_end): list_all(d) logger.debug("Done!") check_cryptography_version() if __name__ == "__main__": main()