Skip to content

Commit

Permalink
Fix SSH Key creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Kolman-Freecss committed Oct 7, 2024
1 parent 1cc2764 commit fdc441b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ docker push kolmanfreecss/jenkins-git
- ```bash
dos2unix YOUR_SCRIPT.helpers
```
- Create SSH credentials on Jenkins through Python with Jenkins API. Problem with the XML tag using incorrect format for the implementation
- Solution: Use `com.cloudbees.jenkins.plugins.sshcredentials.impl.BasicSSHUserPrivateKey`
## AWS
- Check SSH key permissions to connect to EC2 instance.
Expand Down
2 changes: 1 addition & 1 deletion src/local/main/init_jenkins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def fetch():

# ----------- Generate SSH key pair -----------
print('Getting SSH key...')
private_key = services.get_ssh() # Get the private key from the SSH key pair to connect Jenkins node via SSH to the agent (machine defined)
services.get_ssh() # Get the private key from the SSH key pair to connect Jenkins node via SSH to the agent (machine defined)
services.build_credentials(CredentialsType.SSH)

# Jenkins version
Expand Down
43 changes: 29 additions & 14 deletions src/local/main/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,34 @@ def build_user_credentials() -> any:
'''
return credentials

def build_ssh_credentials() -> any:
def build_ssh_credentials(force: bool = False) -> any:
"""
Create SSH credentials in Jenkins
:param force: Flag to force the creation of credentials even if they already exist
:return:
"""
private_key = get_ssh()
private_key = get_ssh(force)

id_value = f"{config_module.config.get(config_module.ConfigKeys.JENKINS_CREDENTIALS_ID)}-ssh"
credentials = f'''<?xml version='1.1' encoding='UTF-8'?>
<com.cloudbees.plugins.credentials.impl.SSHUserPrivateKey>
<com.cloudbees.jenkins.plugins.sshcredentials.impl.BasicSSHUserPrivateKey>
<scope>GLOBAL</scope>
<id>{id_value}</id>
<username>{config_module.config.get(config_module.ConfigKeys.JENKINS_USER)}</username>
<privateKey>{private_key}</privateKey>
<privateKeySource class="com.cloudbees.jenkins.plugins.sshcredentials.impl.BasicSSHUserPrivateKey$DirectEntryPrivateKeySource">
<privateKey>{private_key}</privateKey>
</privateKeySource>
<description>SSH Credentials to access GitHub</description>
</com.cloudbees.plugins.credentials.impl.SSHUserPrivateKey>
</com.cloudbees.jenkins.plugins.sshcredentials.impl.BasicSSHUserPrivateKey>
'''
print(f'BUILD SSH CREDENTIALS:: XML Document: {credentials}')
return credentials


def build_credentials(credential_type: CredentialsType) -> any:
def build_credentials(credential_type: CredentialsType, force: bool = False) -> any:
"""
Create credentials in Jenkins based on the type specified
:param force: Flag to force the creation of credentials even if they already exist
:param credential_type: CredentialType Enum specifying the type of credentials
:return:
"""
Expand All @@ -92,7 +97,7 @@ def build_credentials(credential_type: CredentialsType) -> any:
if credential_type == CredentialsType.USER:
credentials = build_user_credentials()
elif credential_type == CredentialsType.SSH:
credentials = build_ssh_credentials()
credentials = build_ssh_credentials(force)
else:
raise ValueError("Unsupported credential type")

Expand All @@ -114,18 +119,21 @@ def build_credentials(credential_type: CredentialsType) -> any:

if response.status_code == 200:
print('Credentials created successfully')
elif not force:
print(f'Error creating credentials status code: {response.status_code}, message: {response.text}, \n retrying with force flag')
build_credentials(credential_type, force=True)
else:
print(f'Error creating credentials: {response.text}')
print(f'Error creating credentials status code: {response.status_code}, message: {response.text}')

def get_ssh() -> str:
def get_ssh(force: bool = False) -> str:
"""
Get SSH credentials
:return: Private key if it exists; otherwise, generates a new one and returns it.
"""
ssh_dir = os.path.expanduser('~/.ssh')
ssh_key_path = os.path.join(ssh_dir, 'id_rsa')

if os.path.exists(ssh_key_path) and os.path.getsize(ssh_key_path) > 0:
if os.path.exists(ssh_key_path) and os.path.getsize(ssh_key_path) > 0 and not force:
try:
with open(ssh_key_path, 'r') as private_key_file:
private_key = private_key_file.read()
Expand All @@ -135,12 +143,15 @@ def get_ssh() -> str:
print(f"Private key not found at {ssh_key_path}. Generating a new key.")
return create_ssh()
else:
print(f"No SSH key found at {ssh_key_path}. Generating a new key.")
return create_ssh()
if force:
print(f"Force flag set. Generating a new key.")
else:
print(f"No SSH key found at {ssh_key_path}. Generating a new key.")
return create_ssh(force)



def create_ssh() -> str:
def create_ssh(force: bool = False) -> str:
"""
Generate an SSH key pair if it does not exist.
:return: The private key as a string. None if the private key is not found.
Expand All @@ -154,10 +165,14 @@ def create_ssh() -> str:
print(f"Directory {ssh_dir} created.")

# Check if the SSH key already exists
if os.path.exists(ssh_key_path) and os.path.getsize(ssh_key_path) > 0:
if os.path.exists(ssh_key_path) and os.path.getsize(ssh_key_path) > 0 and not force:
print(f"SSH key pair already exists at {ssh_key_path}.")
return get_ssh() # Return the existing key

if force:
print("Force flag set. Deleting existing SSH key pair.")
os.remove(ssh_key_path)

print("Generating an SSH key pair...")
script_path = 'helpers/gen_ssh.sh'

Expand Down

0 comments on commit fdc441b

Please sign in to comment.