From 7e25245e381a54045f5b039de9f7f9050f6c3c3c Mon Sep 17 00:00:00 2001 From: zblurx <68540460+zblurx@users.noreply.github.com> Date: Wed, 20 Mar 2024 23:19:45 +0100 Subject: [PATCH] Implement MS-GKDI and LAPSv2 password extraction (#1556) * add MS-GKDI * update gkdi * Update getLAPSv2Password.py * Added GetLAPSPassword to examples. (#1) * Added GetLAPSPassword to examples. * update GetLapsPassword * Add LAPSv2 column --------- Co-authored-by: dru1d Co-authored-by: zblurx * update GetLAPSPassword.py * Added tab delimited outputfile option (#2) Co-authored-by: dru1d * fix requirements.txt --------- Co-authored-by: Tyler <4245930+dru1d-foofus@users.noreply.github.com> Co-authored-by: dru1d Co-authored-by: = <=> --- examples/GetLAPSPassword.py | 367 ++++++++++++++++++++++++++++++++++++ impacket/dcerpc/v5/epm.py | 2 +- impacket/dcerpc/v5/gkdi.py | 213 +++++++++++++++++++++ impacket/dpapi_ng.py | 345 +++++++++++++++++++++++++++++++++ requirements.txt | 1 + setup.py | 3 +- 6 files changed, 929 insertions(+), 2 deletions(-) create mode 100755 examples/GetLAPSPassword.py create mode 100644 impacket/dcerpc/v5/gkdi.py create mode 100644 impacket/dpapi_ng.py diff --git a/examples/GetLAPSPassword.py b/examples/GetLAPSPassword.py new file mode 100755 index 0000000000..1177eeec3c --- /dev/null +++ b/examples/GetLAPSPassword.py @@ -0,0 +1,367 @@ +#!/usr/bin/env python +# Impacket - Collection of Python classes for working with network protocols. +# +# Copyright (C) 2023 Fortra. All rights reserved. +# +# This software is provided under a slightly modified version +# of the Apache Software License. See the accompanying LICENSE file +# for more information. +# +# Description: +# This script will gather data about the domain's computers and their LAPS/LAPSv2 passwords. +# Initial formatting for this tool came from the GetADUsers.py example script. +# +# Author(s): +# Thomas Seigneuret (@zblurx) +# Tyler Booth (@dru1d-foofus) +# +# Reference for: +# LDAP +# + +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals +from datetime import datetime +from impacket import version +from impacket.dcerpc.v5 import transport +from impacket.dcerpc.v5.epm import hept_map +from impacket.dcerpc.v5.gkdi import MSRPC_UUID_GKDI, GkdiGetKey, GroupKeyEnvelope +from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, RPC_C_AUTHN_LEVEL_PKT_PRIVACY +from impacket.dpapi_ng import EncryptedPasswordBlob, KeyIdentifier, compute_kek, create_sd, decrypt_plaintext, unwrap_cek +from impacket.examples import logger +from impacket.examples.utils import parse_credentials +from impacket.ldap import ldap, ldapasn1 +from impacket.smbconnection import SMBConnection, SessionError +from pyasn1.codec.der import decoder +from pyasn1_modules import rfc5652 +import argparse +import json +import logging +import sys + +class GetLAPSPassword: + @staticmethod + def printTable(items, header, outputfile): + colLen = [] + for i, col in enumerate(header): + rowMaxLen = max([len(row[i]) for row in items]) + colLen.append(max(rowMaxLen, len(col))) + + outputFormat = ' '.join(['{%d:%ds} ' % (num, width) for num, width in enumerate(colLen)]) + + # Print header + print(outputFormat.format(*header)) + print(' '.join(['-' * itemLen for itemLen in colLen])) + for row in items: + print(outputFormat.format(*row)) + + if outputfile: + with open(outputfile, 'w') as file: + outputFormat_file = '\t'.join(['{%d:%ds}' % (num, width) for num, width in enumerate(colLen)]) # Added tab delimited output for files + file.write(outputFormat_file.format(*header) + "\n") + for row in items: + file.write((outputFormat_file.format(*row)).strip() + "\n") # Removed extraneous field to clean up output saved to a file + + def __init__(self, username, password, domain, cmdLineOptions): + self.options = cmdLineOptions + self.__username = username + self.__password = password + self.__domain = domain + self.__target = None + self.__lmhash = '' + self.__nthash = '' + self.__aesKey = cmdLineOptions.aesKey + self.__doKerberos = cmdLineOptions.k + self.__kdcIP = cmdLineOptions.dc_ip + self.__kdcHost = cmdLineOptions.dc_host + self.__targetComputer = cmdLineOptions.computer + self.__outputFile = cmdLineOptions.outputfile + self.__KDSCache = {} + + if cmdLineOptions.hashes is not None: + self.__lmhash, self.__nthash = cmdLineOptions.hashes.split(':') + + # Create the baseDN + domainParts = self.__domain.split('.') + self.baseDN = '' + for i in domainParts: + self.baseDN += 'dc=%s,' % i + # Remove last ',' + self.baseDN = self.baseDN[:-1] + + def getLAPSv2Decrypt(self, rawEncryptedLAPSBlob): + try: + encryptedLAPSBlob = EncryptedPasswordBlob(rawEncryptedLAPSBlob) + parsed_cms_data, remaining = decoder.decode(encryptedLAPSBlob['Blob'], asn1Spec=rfc5652.ContentInfo()) + enveloped_data_blob = parsed_cms_data['content'] + parsed_enveloped_data, _ = decoder.decode(enveloped_data_blob, asn1Spec=rfc5652.EnvelopedData()) + recipient_infos = parsed_enveloped_data['recipientInfos'] + kek_recipient_info = recipient_infos[0]['kekri'] + kek_identifier = kek_recipient_info['kekid'] + key_id = KeyIdentifier(bytes(kek_identifier['keyIdentifier'])) + tmp,_ = decoder.decode(kek_identifier['other']['keyAttr']) + sid = tmp['field-1'][0][0][1].asOctets().decode("utf-8") + target_sd = create_sd(sid) + laps_enabled = True + except Exception as e: + logging.error('Cannot unpack msLAPS-EncryptedPassword blob due to error %s' % str(e)) + # Check if item is in cache + if key_id['RootKeyId'] in self.__KDSCache: + gke = self.__KDSCache[key_id['RootKeyId']] + else: + # Connect on RPC over TCP to MS-GKDI to call opnum 0 GetKey + stringBinding = hept_map(destHost=self.__target, remoteIf=MSRPC_UUID_GKDI, protocol = 'ncacn_ip_tcp') + rpctransport = transport.DCERPCTransportFactory(stringBinding) + if hasattr(rpctransport, 'set_credentials'): + rpctransport.set_credentials(username=self.__username, password=self.__password, domain=self.__domain, lmhash=self.__lmhash, nthash=self.__nthash) + if self.__doKerberos: + rpctransport.set_kerberos(self.__doKerberos, kdcHost=self.__target) + if self.__kdcIP is not None: + rpctransport.setRemoteHost(self.__kdcIP) + rpctransport.setRemoteName(self.__target) + + dce = rpctransport.get_dce_rpc() + dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_INTEGRITY) + dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) + logging.debug("Connecting to %s" % stringBinding) + try: + dce.connect() + except Exception as e: + logging.error("Something went wrong, check error status => %s" % str(e)) + return laps_enabled + logging.debug("Connected") + try: + dce.bind(MSRPC_UUID_GKDI) + except Exception as e: + logging.error("Something went wrong, check error status => %s" % str(e)) + return laps_enabled + logging.debug("Successfully bound") + + + logging.debug("Calling MS-GKDI GetKey") + resp = GkdiGetKey(dce, target_sd=target_sd, l0=key_id['L0Index'], l1=key_id['L1Index'], l2=key_id['L2Index'], root_key_id=key_id['RootKeyId']) + # Unpack GroupKeyEnvelope + gke = GroupKeyEnvelope(b''.join(resp['pbbOut'])) + self.__KDSCache[gke['RootKeyId']] = gke + + kek = compute_kek(gke, key_id) + enc_content_parameter = bytes(parsed_enveloped_data["encryptedContentInfo"]["contentEncryptionAlgorithm"]["parameters"]) + iv, _ = decoder.decode(enc_content_parameter) + iv = bytes(iv[0]) + + cek = unwrap_cek(kek, bytes(kek_recipient_info['encryptedKey'])) + return decrypt_plaintext(cek, iv, remaining) + + def getMachineName(self, target): + try: + s = SMBConnection(target, target) + s.login('', '') + except OSError as e: + if str(e).find('timed out') > 0: + raise Exception('The connection is timed out. Probably 445/TCP port is closed. Try to specify ' + 'corresponding NetBIOS name or FQDN as the value of the -dc-host option') + else: + raise + except SessionError as e: + if str(e).find('STATUS_NOT_SUPPORTED') > 0: + raise Exception('The SMB request is not supported. Probably NTLM is disabled. Try to specify ' + 'corresponding NetBIOS name or FQDN as the value of the -dc-host option') + else: + raise + except Exception: + if s.getServerName() == '': + raise Exception('Error while anonymous logging into %s' % target) + else: + s.logoff() + return s.getServerName() + + @staticmethod + def getUnixTime(t): + t -= 116444736000000000 + t /= 10000000 + return t + + def run(self): + if self.__kdcHost is not None: + self.__target = self.__kdcHost + else: + if self.__kdcIP is not None: + self.__target = self.__kdcIP + else: + self.__target = self.__domain + + if self.__doKerberos: + logging.info('Getting machine hostname') + self.__target = self.getMachineName(self.__target) + + # Connect to LDAP + try: + ldapConnection = ldap.LDAPConnection('ldap://%s' % self.__target, self.baseDN, self.__kdcIP) + if self.__doKerberos is not True: + ldapConnection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash) + else: + ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash, + self.__aesKey, kdcHost=self.__kdcIP) + except ldap.LDAPSessionError as e: + if str(e).find('strongerAuthRequired') >= 0: + # We need to try SSL + ldapConnection = ldap.LDAPConnection('ldaps://%s' % self.__target, self.baseDN, self.__kdcIP) + if self.__doKerberos is not True: + ldapConnection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash) + else: + ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash, + self.__aesKey, kdcHost=self.__kdcIP) + else: + if str(e).find('NTLMAuthNegotiate') >= 0: + logging.critical("NTLM negotiation failed. Probably NTLM is disabled. Try to use Kerberos " + "authentication instead.") + else: + if self.__kdcIP is not None and self.__kdcHost is not None: + logging.critical("If the credentials are valid, check the hostname and IP address of KDC. They " + "must match exactly each other.") + raise + + # Building the search filter + searchFilter = "(&(objectCategory=computer)(|(msLAPS-EncryptedPassword=*)(ms-MCS-AdmPwd=*)(msLAPS-Password=*))" # Default search filter value + if self.__targetComputer is not None: + searchFilter += '(name=' + self.__targetComputer + ')' + searchFilter += ")" + + try: + # Microsoft Active Directory set an hard limit of 1000 entries returned by any search + paged_search_control = ldapasn1.SimplePagedResultsControl(criticality=True, size=1000) + + resp = ldapConnection.search(searchFilter=searchFilter, + attributes=['msLAPS-EncryptedPassword', 'msLAPS-PasswordExpirationTime', 'msLAPS-Password', 'sAMAccountName', \ + 'ms-Mcs-AdmPwdExpirationTime', 'ms-MCS-AdmPwd'], + searchControls=[paged_search_control]) + + except ldap.LDAPSearchError as e: + if e.getErrorString().find('sizeLimitExceeded') >= 0: + # We should never reach this code as we use paged search now + logging.debug('sizeLimitExceeded exception caught, giving up and processing the data received') + resp = e.getAnswers() + pass + else: + raise + + entries = [] + + logging.debug('Total of records returned %d' % len(resp)) + + if len(resp) == 0: + if self.__targetComputer is not None: + logging.error('%s$ not found in LDAP.' % self.__targetComputer) + else: + logging.error("No valid entry in LDAP") + return + for item in resp: + if isinstance(item, ldapasn1.SearchResultEntry) is not True: + continue + try: + sAMAccountName = None + lapsPasswordExpiration = None + lapsUsername = None + lapsPassword = None + lapsv2 = False + for attribute in item['attributes']: + if str(attribute['type']) == 'sAMAccountName': + sAMAccountName = str(attribute['vals'][0]) + if str(attribute['type']) == 'msLAPS-EncryptedPassword': + lapsv2 = True + plaintext = self.getLAPSv2Decrypt(bytes(attribute['vals'][0])) + r = json.loads(plaintext[:-18].decode('utf-16le')) + # timestamp = r["t"] + lapsUsername = r["n"] + lapsPassword = r["p"] + elif str(attribute['type']) == 'ms-Mcs-AdmPwdExpirationTime' or str(attribute['type']) == 'msLAPS-PasswordExpirationTime': + if str(attribute['vals'][0]) != '0': + lapsPasswordExpiration = datetime.fromtimestamp(self.getUnixTime(int(str(attribute['vals'][0])))).strftime('%Y-%m-%d %H:%M:%S') + elif str(attribute['type']) == 'ms-Mcs-AdmPwd': + lapsPassword = attribute['vals'][0].asOctets().decode('utf-8') + if sAMAccountName is not None and lapsPassword is not None: + entry = [sAMAccountName,lapsUsername, lapsPassword, lapsPasswordExpiration, str(lapsv2)] + entry = [element if element is not None else 'N/A' for element in entry] + entries.append(entry) + except Exception as e: + logging.error('Skipping item, cannot process due to error %s' % str(e)) + pass + + if len(entries) == 0: + if self.__targetComputer is not None: + logging.error("No LAPS data returned for %s" % self.__targetComputer) + else: + logging.error("No LAPS data returned") + return + + self.printTable(entries,['Host','LAPS Username','LAPS Password','LAPS Password Expiration', 'LAPSv2'], self.__outputFile) + +# Process command-line arguments. +if __name__ == '__main__': + print((version.BANNER)) + + parser = argparse.ArgumentParser(add_help = True, description = "Extract LAPS passwords from LDAP") + + parser.add_argument('target', action='store', help='domain[/username[:password]]') + parser.add_argument('-computer', action='store', metavar='computername', help='Target a specific computer by its name') + + parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output') + parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') + parser.add_argument('-outputfile', '-o', action='store', help='Outputs to a file.') + + group = parser.add_argument_group('authentication') + group.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') + group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)') + group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file ' + '(KRB5CcnAME) based on target parameters. If valid credentials ' + 'cannot be found, it will use the ones specified in the command ' + 'line') + group.add_argument('-aesKey', action="store", metavar = "hex key", help='AES key to use for Kerberos Authentication ' + '(128 or 256 bits)') + + group = parser.add_argument_group('connection') + group.add_argument('-dc-ip', action='store', metavar='ip address', help='IP Address of the domain controller. If ' + 'ommited it use the domain part (FQDN) ' + 'specified in the target parameter') + group.add_argument('-dc-host', action='store', metavar='hostname', help='Hostname of the domain controller to use. ' + 'If ommited, the domain part (FQDN) ' + 'specified in the account parameter will be used') + + if len(sys.argv)==1: + parser.print_help() + sys.exit(1) + + options = parser.parse_args() + + # Init the example's logger theme + logger.init(options.ts) + + if options.debug is True: + logging.getLogger().setLevel(logging.DEBUG) + # Print the Library's installation path + logging.debug(version.getInstallationPath()) + else: + logging.getLogger().setLevel(logging.INFO) + + domain, username, password = parse_credentials(options.target) + + if domain == '': + logging.critical('Domain should be specified!') + sys.exit(1) + + if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None: + from getpass import getpass + password = getpass("Password:") + + if options.aesKey is not None: + options.k = True + + try: + executer = GetLAPSPassword(username, password, domain, options) + executer.run() + except Exception as e: + if logging.getLogger().level == logging.DEBUG: + import traceback + traceback.print_exc() + logging.error(str(e)) \ No newline at end of file diff --git a/impacket/dcerpc/v5/epm.py b/impacket/dcerpc/v5/epm.py index 53b941809b..e9feb02f8f 100644 --- a/impacket/dcerpc/v5/epm.py +++ b/impacket/dcerpc/v5/epm.py @@ -1315,7 +1315,7 @@ def hept_map(destHost, remoteIf, dataRepresentation = uuidtup_to_bin(('8a885d04- tower['Floors'] = interface.getData() + dataRep.getData() + protId.getData() + transportData request = ept_map() - request['max_towers'] = 1 + request['max_towers'] = 4 request['map_tower']['tower_length'] = len(tower) request['map_tower']['tower_octet_string'] = tower.getData() diff --git a/impacket/dcerpc/v5/gkdi.py b/impacket/dcerpc/v5/gkdi.py new file mode 100644 index 0000000000..0a5e4d4644 --- /dev/null +++ b/impacket/dcerpc/v5/gkdi.py @@ -0,0 +1,213 @@ +from impacket.dcerpc.v5.ndr import NDRCALL, NDRPOINTER, NDRUniConformantArray +from impacket.dcerpc.v5.dtypes import ULONG, PGUID, LONG, NTSTATUS, NULL +from impacket.dcerpc.v5.rpcrt import DCERPCException +from impacket import hresult_errors +from impacket.structure import Structure +from impacket.uuid import uuidtup_to_bin + +class DCERPCSessionError(DCERPCException): + def __init__(self, error_string=None, error_code=None, packet=None): + DCERPCException.__init__(self, error_string, error_code, packet) + + def __str__( self ): + key = self.error_code + if key in hresult_errors.ERROR_MESSAGES: + error_msg_short = hresult_errors.ERROR_MESSAGES[key][0] + error_msg_verbose = hresult_errors.ERROR_MESSAGES[key][1] + return 'GKDI SessionError: code: 0x%x - %s - %s' % (self.error_code, error_msg_short, error_msg_verbose) + else: + return 'GKDI SessionError: unknown error code: 0x%x' % self.error_code + +################################################################################ +# CONSTANTS +################################################################################ + +MSRPC_UUID_GKDI = uuidtup_to_bin(('B9785960-524F-11DF-8B6D-83DCDED72085','1.0')) + +################################################################################ +# STRUCTURES +################################################################################ + +# 2.2.1 KDF Parameters +class KDFParameter(Structure): + structure = ( + ('Unknown1','`_. + + Modified version for Impacket, accepting null_bytes + + Args: + master (byte string): + The secret value used by the KDF to derive the other keys. + It must not be a password. + The length on the secret must be consistent with the input expected by + the :data:`prf` function. + key_len (integer): + The length in bytes of each derived key. + prf (function): + A pseudorandom function that takes two byte strings as parameters: + the secret and an input. It returns another byte string. + num_keys (integer): + The number of keys to derive. Every key is :data:`key_len` bytes long. + By default, only 1 key is derived. + label (byte string): + Optional description of the purpose of the derived keys. + It must not contain zero bytes. + context (byte string): + Optional information pertaining to + the protocol that uses the keys, such as the identity of the + participants, nonces, session IDs, etc. + It must not contain zero bytes. + + Return: + - a byte string (if ``num_keys`` is not specified), or + - a tuple of byte strings (if ``num_key`` is specified). + """ + + if num_keys is None: + num_keys = 1 + + key_len_enc = long_to_bytes(key_len * num_keys * 8, 4) + output_len = key_len * num_keys + + i = 1 + dk = b"" + while len(dk) < output_len: + info = long_to_bytes(i, 4) + label + b'\x00' + context + key_len_enc + dk += prf(master, info) + i += 1 + if i > 0xFFFFFFFF: + raise ValueError("Overflow in SP800 108 counter") + + if num_keys == 1: + return dk[:key_len] + else: + kol = [dk[idx:idx + key_len] + for idx in iter_range(0, output_len, key_len)] + return kol + +class KeyIdentifier(Structure): + structure = ( + ('Version', ' bool: + return bool(self['Flags'] & 1) + +class EncryptedPasswordBlob(Structure): + structure = ( + ('Timestamp_lower', ' bytes: + return n.to_bytes(length=4, byteorder="big") + +def create_ace(sid, mask): + nace = ACE() + nace['AceType'] = ACCESS_ALLOWED_ACE.ACE_TYPE + nace['AceFlags'] = 0x00 + acedata = ACCESS_ALLOWED_ACE() + acedata['Mask'] = ACCESS_MASK() + acedata['Mask']['Mask'] = mask + acedata['Sid'] = LDAP_SID() + acedata['Sid'].fromCanonical(sid) + nace['Ace'] = acedata + return nace + +def create_sd(sid): + sd = SR_SECURITY_DESCRIPTOR() + sd['Revision'] = b'\x01' + sd['Sbz1'] = b'\x00' + sd['Control'] = 32772 + sd['OwnerSid'] = LDAP_SID() + sd['OwnerSid'].fromCanonical('S-1-5-18') + sd['GroupSid'] = LDAP_SID() + sd['GroupSid'].fromCanonical('S-1-5-18') + sd['Sacl'] = b'' + + acl = ACL() + acl['AclRevision'] = 2 + acl['Sbz1'] = 0 + acl['Sbz2'] = 0 + acl.aces = [] + acl.aces.append(create_ace(sid, 3)) + acl.aces.append(create_ace('S-1-1-0',2)) + sd['Dacl'] = acl + return sd + +def compute_kdf_hash(length, key_material, otherinfo): + output = [b""] + outlen = 0 + counter = 1 + + while length > outlen: + hash_module = SHA256.SHA256Hash() + hash_module.update(data = int_to_u32be(counter)) + hash_module.update(key_material) + hash_module.update(otherinfo) + output.append(hash_module.digest()) + outlen += len(output[-1]) + counter += 1 + + return b"".join(output)[:length] + +def compute_kdf_context(key_guid, l0, l1, l2): + return b"".join( + [ + key_guid, + l0.to_bytes(4, byteorder="little", signed=True), + l1.to_bytes(4, byteorder="little", signed=True), + l2.to_bytes(4, byteorder="little", signed=True), + ] + ) + +def kdf(hash_alg_str, secret, label, context, length): + hash_alg = SHA512 + if 'SHA512' in hash_alg_str: + hash_alg = SHA512 + elif 'SHA256' in hash_alg_str: + hash_alg = SHA256 + + def prf(s,x): + return HMAC.new(s,x,hash_alg).digest() + + return SP800_108_Counter( + master=secret, + prf=prf, + key_len=length, + label=label, + context=context + ) + +def compute_l2_key(key_id: KeyIdentifier, gke: GroupKeyEnvelope): + l1 = gke["L1Index"] + l1_key = gke["L1Key"] + l2 = gke["L2Index"] + l2_key = gke["L2Key"] + + reseed_l2 = l2 == 31 or l1 != key_id["L1Index"] + + kdf_param = gke["KdfPara"]["HashName"].decode('utf-16le') + + if l2 != 31 and l1 != key_id["L1Index"]: + l1 -= 1 + + while l1 != key_id["L1Index"]: + reseed_l2 = True + l1 -= 1 + + l1_key = kdf( + kdf_param, + l1_key, + KDS_SERVICE_LABEL, + compute_kdf_context( + gke["RootKeyId"], + gke["L0Index"], + l1, + -1 + ), + 64 + ) + + if reseed_l2: + l2 = 31 + l2_key = kdf( + kdf_param, + l1_key, + KDS_SERVICE_LABEL, + compute_kdf_context( + gke["RootKeyId"], + gke["L0Index"], + l1, + l2, + ), + 64, + ) + + while l2 != key_id["L2Index"]: + l2 -= 1 + + l2_key = kdf( + kdf_param, + l2_key, + KDS_SERVICE_LABEL, + compute_kdf_context( + gke["RootKeyId"], + gke["L0Index"], + l1, + l2, + ), + 64, + ) + + return l2_key + +def generate_kek_secret_from_pubkey(gke: GroupKeyEnvelope, key_id: KeyIdentifier,l2_key: bytes): + private_key = kdf( + gke["KdfPara"]["HashName"].decode('utf-16le'), + l2_key, + KDS_SERVICE_LABEL, + gke['SecAlgo'], + math.ceil(gke["PrivKeyLength"] / 8), + ) + if gke['SecAlgo'].decode('utf-16le').encode() == b"DH\0": + ffcdh_key = FFCDHKey(key_id["Unknown"]) + shared_secret_int = pow( + int.from_bytes(ffcdh_key['PubKey'], byteorder="big"), + int.from_bytes(private_key, byteorder="big"), + int.from_bytes(ffcdh_key['FieldOrder'], byteorder="big"), + ) + shared_secret = shared_secret_int.to_bytes((shared_secret_int.bit_length() + 7) // 8, byteorder="big") + elif "ECDH_P" in gke['SecAlgo'].decode('utf-16le'): + ecdh_key = ECDHKey(key_id["Unknown"]) + # not yet supported + return + kek_context = "KDS public key\0".encode("utf-16le") + otherinfo = "SHA512\0".encode("utf-16le") + kek_context + KDS_SERVICE_LABEL + return compute_kdf_hash(length=32, otherinfo=otherinfo, key_material=shared_secret), kek_context + +def compute_kek(gke: GroupKeyEnvelope, key_id: KeyIdentifier): + kek_context = None + kek_secret = None + + l2_key = compute_l2_key(key_id, gke) + + if key_id.is_public_key(): + kek_secret, kek_context = generate_kek_secret_from_pubkey(gke=gke, key_id=key_id, l2_key=l2_key) + else: + kek_secret = l2_key + kek_context = key_id["Unknown"] + + return kdf( + gke["KdfPara"]["HashName"].decode('utf-16le'), + kek_secret, + KDS_SERVICE_LABEL, + kek_context, + 32 + ) + +def aes_unwrap(wrapping_key: bytes, wrapped_key: bytes): + aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6" + r = [wrapped_key[i : i + 8] for i in range(0, len(wrapped_key), 8)] + a = r.pop(0) + decryptor = AES.new(wrapping_key, AES.MODE_ECB) + n = len(r) + for j in reversed(range(6)): + for i in reversed(range(n)): + atr = ( + int.from_bytes(a, byteorder="big") ^ ((n * j) + i + 1) + ).to_bytes(length=8, byteorder="big") + r[i] + # every decryption operation is a discrete 16 byte chunk so + # it is safe to reuse the decryptor for the entire operation + b = decryptor.decrypt(atr) + # b = decryptor.update(atr) + a = b[:8] + r[i] = b[-8:] + if a == aiv: + return b"".join(r) + else: + return None + +def unwrap_cek(kek, encrypted_cek): + r = aes_unwrap(kek, encrypted_cek) + if r is None: + raise ValueError("Could not unwrap key") + return r + +def decrypt_plaintext(cek, iv, encrypted_blob): + cipher = AES.new(cek, AES.MODE_GCM, nonce=iv) + return cipher.decrypt(encrypted_blob) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f6cb46a927..92b49c475d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ setuptools six charset_normalizer pyasn1>=0.2.3 +pyasn1_modules pycryptodomex pyOpenSSL>=21.0.0 ldap3>=2.5,!=2.5.2,!=2.5.0,!=2.6 diff --git a/setup.py b/setup.py index fe3fd98e65..192f4ded3b 100644 --- a/setup.py +++ b/setup.py @@ -67,7 +67,8 @@ def read(fname): 'impacket.examples.ntlmrelayx.attacks', 'impacket.examples.ntlmrelayx.attacks.httpattacks'], scripts=glob.glob(os.path.join('examples', '*.py')), data_files=data_files, - install_requires=['pyasn1>=0.2.3', 'pycryptodomex', 'pyOpenSSL>=21.0.0', 'six', 'ldap3>=2.5,!=2.5.2,!=2.5.0,!=2.6', + + install_requires=['pyasn1>=0.2.3', 'pyasn1_modules', 'pycryptodomex', 'pyOpenSSL>=21.0.0', 'six', 'ldap3>=2.5,!=2.5.2,!=2.5.0,!=2.6', 'ldapdomaindump>=0.9.0', 'flask>=1.0', 'setuptools', 'charset_normalizer'], extras_require={'pyreadline:sys_platform=="win32"': [], },