Skip to content

Commit

Permalink
Implement MS-GKDI and LAPSv2 password extraction (fortra#1556)
Browse files Browse the repository at this point in the history
* add MS-GKDI

* update gkdi

* Update getLAPSv2Password.py

* Added GetLAPSPassword to examples. (#1)

* Added GetLAPSPassword to examples.

* update GetLapsPassword

* Add LAPSv2 column

---------

Co-authored-by: dru1d <[email protected]>
Co-authored-by: zblurx <[email protected]>

* update GetLAPSPassword.py

* Added tab delimited outputfile option (#2)

Co-authored-by: dru1d <[email protected]>

* fix requirements.txt

---------

Co-authored-by: Tyler <[email protected]>
Co-authored-by: dru1d <[email protected]>
Co-authored-by: = <=>
  • Loading branch information
3 people authored Mar 20, 2024
1 parent 0d2b72a commit 7e25245
Show file tree
Hide file tree
Showing 6 changed files with 929 additions and 2 deletions.
367 changes: 367 additions & 0 deletions examples/GetLAPSPassword.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,367 @@
#!/usr/bin/env python
# Impacket - Collection of Python classes for working with network protocols.
#
# Copyright (C) 2023 Fortra. All rights reserved.
#
# This software is provided under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Description:
# This script will gather data about the domain's computers and their LAPS/LAPSv2 passwords.
# Initial formatting for this tool came from the GetADUsers.py example script.
#
# Author(s):
# Thomas Seigneuret (@zblurx)
# Tyler Booth (@dru1d-foofus)
#
# Reference for:
# LDAP
#

from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from datetime import datetime
from impacket import version
from impacket.dcerpc.v5 import transport
from impacket.dcerpc.v5.epm import hept_map
from impacket.dcerpc.v5.gkdi import MSRPC_UUID_GKDI, GkdiGetKey, GroupKeyEnvelope
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, RPC_C_AUTHN_LEVEL_PKT_PRIVACY
from impacket.dpapi_ng import EncryptedPasswordBlob, KeyIdentifier, compute_kek, create_sd, decrypt_plaintext, unwrap_cek
from impacket.examples import logger
from impacket.examples.utils import parse_credentials
from impacket.ldap import ldap, ldapasn1
from impacket.smbconnection import SMBConnection, SessionError
from pyasn1.codec.der import decoder
from pyasn1_modules import rfc5652
import argparse
import json
import logging
import sys

class GetLAPSPassword:
@staticmethod
def printTable(items, header, outputfile):
colLen = []
for i, col in enumerate(header):
rowMaxLen = max([len(row[i]) for row in items])
colLen.append(max(rowMaxLen, len(col)))

outputFormat = ' '.join(['{%d:%ds} ' % (num, width) for num, width in enumerate(colLen)])

# Print header
print(outputFormat.format(*header))
print(' '.join(['-' * itemLen for itemLen in colLen]))
for row in items:
print(outputFormat.format(*row))

if outputfile:
with open(outputfile, 'w') as file:
outputFormat_file = '\t'.join(['{%d:%ds}' % (num, width) for num, width in enumerate(colLen)]) # Added tab delimited output for files
file.write(outputFormat_file.format(*header) + "\n")
for row in items:
file.write((outputFormat_file.format(*row)).strip() + "\n") # Removed extraneous field to clean up output saved to a file

def __init__(self, username, password, domain, cmdLineOptions):
self.options = cmdLineOptions
self.__username = username
self.__password = password
self.__domain = domain
self.__target = None
self.__lmhash = ''
self.__nthash = ''
self.__aesKey = cmdLineOptions.aesKey
self.__doKerberos = cmdLineOptions.k
self.__kdcIP = cmdLineOptions.dc_ip
self.__kdcHost = cmdLineOptions.dc_host
self.__targetComputer = cmdLineOptions.computer
self.__outputFile = cmdLineOptions.outputfile
self.__KDSCache = {}

if cmdLineOptions.hashes is not None:
self.__lmhash, self.__nthash = cmdLineOptions.hashes.split(':')

# Create the baseDN
domainParts = self.__domain.split('.')
self.baseDN = ''
for i in domainParts:
self.baseDN += 'dc=%s,' % i
# Remove last ','
self.baseDN = self.baseDN[:-1]

def getLAPSv2Decrypt(self, rawEncryptedLAPSBlob):
try:
encryptedLAPSBlob = EncryptedPasswordBlob(rawEncryptedLAPSBlob)
parsed_cms_data, remaining = decoder.decode(encryptedLAPSBlob['Blob'], asn1Spec=rfc5652.ContentInfo())
enveloped_data_blob = parsed_cms_data['content']
parsed_enveloped_data, _ = decoder.decode(enveloped_data_blob, asn1Spec=rfc5652.EnvelopedData())
recipient_infos = parsed_enveloped_data['recipientInfos']
kek_recipient_info = recipient_infos[0]['kekri']
kek_identifier = kek_recipient_info['kekid']
key_id = KeyIdentifier(bytes(kek_identifier['keyIdentifier']))
tmp,_ = decoder.decode(kek_identifier['other']['keyAttr'])
sid = tmp['field-1'][0][0][1].asOctets().decode("utf-8")
target_sd = create_sd(sid)
laps_enabled = True
except Exception as e:
logging.error('Cannot unpack msLAPS-EncryptedPassword blob due to error %s' % str(e))
# Check if item is in cache
if key_id['RootKeyId'] in self.__KDSCache:
gke = self.__KDSCache[key_id['RootKeyId']]
else:
# Connect on RPC over TCP to MS-GKDI to call opnum 0 GetKey
stringBinding = hept_map(destHost=self.__target, remoteIf=MSRPC_UUID_GKDI, protocol = 'ncacn_ip_tcp')
rpctransport = transport.DCERPCTransportFactory(stringBinding)
if hasattr(rpctransport, 'set_credentials'):
rpctransport.set_credentials(username=self.__username, password=self.__password, domain=self.__domain, lmhash=self.__lmhash, nthash=self.__nthash)
if self.__doKerberos:
rpctransport.set_kerberos(self.__doKerberos, kdcHost=self.__target)
if self.__kdcIP is not None:
rpctransport.setRemoteHost(self.__kdcIP)
rpctransport.setRemoteName(self.__target)

dce = rpctransport.get_dce_rpc()
dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_INTEGRITY)
dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
logging.debug("Connecting to %s" % stringBinding)
try:
dce.connect()
except Exception as e:
logging.error("Something went wrong, check error status => %s" % str(e))
return laps_enabled
logging.debug("Connected")
try:
dce.bind(MSRPC_UUID_GKDI)
except Exception as e:
logging.error("Something went wrong, check error status => %s" % str(e))
return laps_enabled
logging.debug("Successfully bound")


logging.debug("Calling MS-GKDI GetKey")
resp = GkdiGetKey(dce, target_sd=target_sd, l0=key_id['L0Index'], l1=key_id['L1Index'], l2=key_id['L2Index'], root_key_id=key_id['RootKeyId'])
# Unpack GroupKeyEnvelope
gke = GroupKeyEnvelope(b''.join(resp['pbbOut']))
self.__KDSCache[gke['RootKeyId']] = gke

kek = compute_kek(gke, key_id)
enc_content_parameter = bytes(parsed_enveloped_data["encryptedContentInfo"]["contentEncryptionAlgorithm"]["parameters"])
iv, _ = decoder.decode(enc_content_parameter)
iv = bytes(iv[0])

cek = unwrap_cek(kek, bytes(kek_recipient_info['encryptedKey']))
return decrypt_plaintext(cek, iv, remaining)

def getMachineName(self, target):
try:
s = SMBConnection(target, target)
s.login('', '')
except OSError as e:
if str(e).find('timed out') > 0:
raise Exception('The connection is timed out. Probably 445/TCP port is closed. Try to specify '
'corresponding NetBIOS name or FQDN as the value of the -dc-host option')
else:
raise
except SessionError as e:
if str(e).find('STATUS_NOT_SUPPORTED') > 0:
raise Exception('The SMB request is not supported. Probably NTLM is disabled. Try to specify '
'corresponding NetBIOS name or FQDN as the value of the -dc-host option')
else:
raise
except Exception:
if s.getServerName() == '':
raise Exception('Error while anonymous logging into %s' % target)
else:
s.logoff()
return s.getServerName()

@staticmethod
def getUnixTime(t):
t -= 116444736000000000
t /= 10000000
return t

def run(self):
if self.__kdcHost is not None:
self.__target = self.__kdcHost
else:
if self.__kdcIP is not None:
self.__target = self.__kdcIP
else:
self.__target = self.__domain

if self.__doKerberos:
logging.info('Getting machine hostname')
self.__target = self.getMachineName(self.__target)

# Connect to LDAP
try:
ldapConnection = ldap.LDAPConnection('ldap://%s' % self.__target, self.baseDN, self.__kdcIP)
if self.__doKerberos is not True:
ldapConnection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash)
else:
ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash,
self.__aesKey, kdcHost=self.__kdcIP)
except ldap.LDAPSessionError as e:
if str(e).find('strongerAuthRequired') >= 0:
# We need to try SSL
ldapConnection = ldap.LDAPConnection('ldaps://%s' % self.__target, self.baseDN, self.__kdcIP)
if self.__doKerberos is not True:
ldapConnection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash)
else:
ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash,
self.__aesKey, kdcHost=self.__kdcIP)
else:
if str(e).find('NTLMAuthNegotiate') >= 0:
logging.critical("NTLM negotiation failed. Probably NTLM is disabled. Try to use Kerberos "
"authentication instead.")
else:
if self.__kdcIP is not None and self.__kdcHost is not None:
logging.critical("If the credentials are valid, check the hostname and IP address of KDC. They "
"must match exactly each other.")
raise

# Building the search filter
searchFilter = "(&(objectCategory=computer)(|(msLAPS-EncryptedPassword=*)(ms-MCS-AdmPwd=*)(msLAPS-Password=*))" # Default search filter value
if self.__targetComputer is not None:
searchFilter += '(name=' + self.__targetComputer + ')'
searchFilter += ")"

try:
# Microsoft Active Directory set an hard limit of 1000 entries returned by any search
paged_search_control = ldapasn1.SimplePagedResultsControl(criticality=True, size=1000)

resp = ldapConnection.search(searchFilter=searchFilter,
attributes=['msLAPS-EncryptedPassword', 'msLAPS-PasswordExpirationTime', 'msLAPS-Password', 'sAMAccountName', \
'ms-Mcs-AdmPwdExpirationTime', 'ms-MCS-AdmPwd'],
searchControls=[paged_search_control])

except ldap.LDAPSearchError as e:
if e.getErrorString().find('sizeLimitExceeded') >= 0:
# We should never reach this code as we use paged search now
logging.debug('sizeLimitExceeded exception caught, giving up and processing the data received')
resp = e.getAnswers()
pass
else:
raise

entries = []

logging.debug('Total of records returned %d' % len(resp))

if len(resp) == 0:
if self.__targetComputer is not None:
logging.error('%s$ not found in LDAP.' % self.__targetComputer)
else:
logging.error("No valid entry in LDAP")
return
for item in resp:
if isinstance(item, ldapasn1.SearchResultEntry) is not True:
continue
try:
sAMAccountName = None
lapsPasswordExpiration = None
lapsUsername = None
lapsPassword = None
lapsv2 = False
for attribute in item['attributes']:
if str(attribute['type']) == 'sAMAccountName':
sAMAccountName = str(attribute['vals'][0])
if str(attribute['type']) == 'msLAPS-EncryptedPassword':
lapsv2 = True
plaintext = self.getLAPSv2Decrypt(bytes(attribute['vals'][0]))
r = json.loads(plaintext[:-18].decode('utf-16le'))
# timestamp = r["t"]
lapsUsername = r["n"]
lapsPassword = r["p"]
elif str(attribute['type']) == 'ms-Mcs-AdmPwdExpirationTime' or str(attribute['type']) == 'msLAPS-PasswordExpirationTime':
if str(attribute['vals'][0]) != '0':
lapsPasswordExpiration = datetime.fromtimestamp(self.getUnixTime(int(str(attribute['vals'][0])))).strftime('%Y-%m-%d %H:%M:%S')
elif str(attribute['type']) == 'ms-Mcs-AdmPwd':
lapsPassword = attribute['vals'][0].asOctets().decode('utf-8')
if sAMAccountName is not None and lapsPassword is not None:
entry = [sAMAccountName,lapsUsername, lapsPassword, lapsPasswordExpiration, str(lapsv2)]
entry = [element if element is not None else 'N/A' for element in entry]
entries.append(entry)
except Exception as e:
logging.error('Skipping item, cannot process due to error %s' % str(e))
pass

if len(entries) == 0:
if self.__targetComputer is not None:
logging.error("No LAPS data returned for %s" % self.__targetComputer)
else:
logging.error("No LAPS data returned")
return

self.printTable(entries,['Host','LAPS Username','LAPS Password','LAPS Password Expiration', 'LAPSv2'], self.__outputFile)

# Process command-line arguments.
if __name__ == '__main__':
print((version.BANNER))

parser = argparse.ArgumentParser(add_help = True, description = "Extract LAPS passwords from LDAP")

parser.add_argument('target', action='store', help='domain[/username[:password]]')
parser.add_argument('-computer', action='store', metavar='computername', help='Target a specific computer by its name')

parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
parser.add_argument('-outputfile', '-o', action='store', help='Outputs to a file.')

group = parser.add_argument_group('authentication')
group.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH')
group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)')
group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file '
'(KRB5CcnAME) based on target parameters. If valid credentials '
'cannot be found, it will use the ones specified in the command '
'line')
group.add_argument('-aesKey', action="store", metavar = "hex key", help='AES key to use for Kerberos Authentication '
'(128 or 256 bits)')

group = parser.add_argument_group('connection')
group.add_argument('-dc-ip', action='store', metavar='ip address', help='IP Address of the domain controller. If '
'ommited it use the domain part (FQDN) '
'specified in the target parameter')
group.add_argument('-dc-host', action='store', metavar='hostname', help='Hostname of the domain controller to use. '
'If ommited, the domain part (FQDN) '
'specified in the account parameter will be used')

if len(sys.argv)==1:
parser.print_help()
sys.exit(1)

options = parser.parse_args()

# Init the example's logger theme
logger.init(options.ts)

if options.debug is True:
logging.getLogger().setLevel(logging.DEBUG)
# Print the Library's installation path
logging.debug(version.getInstallationPath())
else:
logging.getLogger().setLevel(logging.INFO)

domain, username, password = parse_credentials(options.target)

if domain == '':
logging.critical('Domain should be specified!')
sys.exit(1)

if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None:
from getpass import getpass
password = getpass("Password:")

if options.aesKey is not None:
options.k = True

try:
executer = GetLAPSPassword(username, password, domain, options)
executer.run()
except Exception as e:
if logging.getLogger().level == logging.DEBUG:
import traceback
traceback.print_exc()
logging.error(str(e))
2 changes: 1 addition & 1 deletion impacket/dcerpc/v5/epm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ def hept_map(destHost, remoteIf, dataRepresentation = uuidtup_to_bin(('8a885d04-
tower['Floors'] = interface.getData() + dataRep.getData() + protId.getData() + transportData

request = ept_map()
request['max_towers'] = 1
request['max_towers'] = 4
request['map_tower']['tower_length'] = len(tower)
request['map_tower']['tower_octet_string'] = tower.getData()

Expand Down
Loading

0 comments on commit 7e25245

Please sign in to comment.