Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit for RAG in py-ml #427

Open
wants to merge 42 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
dbbd4f5
Initial commit for RAG pipeline scripts
hmumtazz Nov 15, 2024
dc94feb
Added Licence Header and fixed .gitingore file
hmumtazz Nov 18, 2024
cba567c
Added comments to understand code
hmumtazz Nov 18, 2024
3481c3e
Remove sensitive config file
hmumtazz Nov 18, 2024
e6eee30
Simplify the selection process for the ef_construction parameter by o…
hmumtazz Nov 18, 2024
8b7aa0e
Allows Customer to register model via CLI, fixed embedding generation…
hmumtazz Nov 21, 2024
2635751
Sign-off on all previous work
hmumtazz Nov 21, 2024
88cf68b
Enhance RAG pipeline functionality and user experience
hmumtazz Nov 22, 2024
ad1ea97
Created Ingest Pipline for chunking.- Merge custom setup.py with the …
hmumtazz Nov 25, 2024
da0ce7c
Removed hard coded LLM model, allowed for Opensoure integration, user…
hmumtazz Nov 29, 2024
fc2ace8
Remove requirements.txt and setup.py from Git tracking
hmumtazz Nov 29, 2024
fe83b25
Update setup.py file
hmumtazz Nov 29, 2024
bea75e0
Remove .gitignore file from rag pipeline directory
hmumtazz Nov 29, 2024
f5b74ed
Organized Model registration into classes
hmumtazz Nov 29, 2024
da834d2
Remove base_model.py from rag pipeline
hmumtazz Nov 29, 2024
9c45e43
Licence header
hmumtazz Nov 29, 2024
a61a840
Updated user setup, Added Semantic search for Managed service using M…
hmumtazz Dec 2, 2024
8db5e9f
fixed failing test in AIConnector tests
hmumtazz Dec 2, 2024
aa42922
Update test_SecretsHelper
hmumtazz Dec 3, 2024
5984c19
fixed license header test_SageMakerModel.py
hmumtazz Dec 3, 2024
4503a55
fixed license header rag.py
hmumtazz Dec 3, 2024
b1b98ab
Added seperate class for embedding generation, removed nominee text p…
hmumtazz Dec 5, 2024
d7e4713
Correctly leveraged existing methods in AI connector class, without h…
hmumtazz Dec 5, 2024
c02fe1b
Removed duplicate method, and deleted unused method
hmumtazz Dec 5, 2024
d877bca
Fixed chunking pipeline, query was not generating due to mismatchd ve…
hmumtazz Dec 5, 2024
f0194ec
Remove config.ini from repository and add to .gitignore
hmumtazz Dec 5, 2024
d354b25
Update rag_setup.py to remove neural search line and serverless
hmumtazz Dec 5, 2024
1ca4221
Updated query.py to reflect search changes
hmumtazz Dec 5, 2024
8b97791
Missing innit file
hmumtazz Dec 5, 2024
fe6f8c5
missing init file
hmumtazz Dec 6, 2024
34f7fd4
Fixed domain ARN/domain name error, changed directory, for IAM, Secre…
hmumtazz Dec 6, 2024
39a1883
Updated AIConnectorHelper to use existing methods, like get task, and…
hmumtazz Dec 6, 2024
2dfa609
Updated License Headers
hmumtazz Dec 6, 2024
2c40ce0
Fixed UT, Ran Lint nox, fixed code and unused enviroment variable iss…
hmumtazz Dec 7, 2024
6a846ce
Deleted serverless.py from rag functionality as we are not using it
hmumtazz Dec 11, 2024
6f17670
deleted unused dependcies, and test.py for serverless
hmumtazz Dec 12, 2024
120169f
Fixed failing test
hmumtazz Dec 13, 2024
91c9af0
Fixed tests
hmumtazz Dec 13, 2024
82e69f5
Fixed tests
hmumtazz Dec 13, 2024
c0114c3
Addressed comments like making code less redudent, and combining methods
hmumtazz Jan 7, 2025
197a134
Addressed code comments, making code less redundent, and combining ex…
hmumtazz Jan 7, 2025
220e9ee
Fixed methods at UT's, adressed comments
hmumtazz Jan 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- updating listing file with three v2 sparse model - by @dhrubo-os ([#412](https://github.com/opensearch-project/opensearch-py-ml/pull/412))
- Update model upload history - opensearch-project/opensearch-neural-sparse-encoding-doc-v2-mini (v.1.0.0)(TORCH_SCRIPT) by @dhrubo-os ([#417](https://github.com/opensearch-project/opensearch-py-ml/pull/417))
- Update model upload history - opensearch-project/opensearch-neural-sparse-encoding-v2-distill (v.1.0.0)(TORCH_SCRIPT) by @dhrubo-os ([#419](https://github.com/opensearch-project/opensearch-py-ml/pull/419))
- Added RAG functionality into `opensearch-py-ml` by @hmumtazz in ([#427](https://github.com/opensearch-project/opensearch-py-ml/pull/427))

### Fixed
- Fix the wrong final zip file name in model_uploader workflow, now will name it by the upload_prefix alse.([#413](https://github.com/opensearch-project/opensearch-py-ml/pull/413/files))
Expand Down
276 changes: 276 additions & 0 deletions opensearch_py_ml/ml_commons/IAMRoleHelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
# SPDX-License-Identifier: Apache-2.0
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Any modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

import json
import logging
import uuid
from datetime import datetime

import boto3
from botocore.exceptions import ClientError


class IAMRoleHelper:
"""
Helper class for managing IAM roles and their interactions with OpenSearch.
"""

def __init__(
self,
region,
opensearch_domain_url=None,
opensearch_domain_username=None,
opensearch_domain_password=None,
):
"""
Initialize the IAMRoleHelper with AWS and OpenSearch configurations.

:param region: AWS region.
:param opensearch_domain_url: URL of the OpenSearch domain.
:param opensearch_domain_username: Username for OpenSearch domain authentication.
:param opensearch_domain_password: Password for OpenSearch domain authentication.
"""
self.region = region
hmumtazz marked this conversation as resolved.
Show resolved Hide resolved
self.opensearch_domain_url = opensearch_domain_url
self.opensearch_domain_username = opensearch_domain_username
self.opensearch_domain_password = opensearch_domain_password

self.iam_client = boto3.client("iam")
self.sts_client = boto3.client("sts", region_name=self.region)

def role_exists(self, role_name):
"""
Check if an IAM role exists.

:param role_name: Name of the IAM role.
:return: True if the role exists, False otherwise.
"""
try:
self.iam_client.get_role(RoleName=role_name)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchEntity":
print(f"The requested role '{role_name}' does not exist.")
else:
print(f"An error occurred: {e}")
return False

def delete_role(self, role_name):
"""
Delete an IAM role along with its attached policies.

:param role_name: Name of the IAM role to delete.
"""
try:
# Detach any managed policies from the role
policies = self.iam_client.list_attached_role_policies(RoleName=role_name)[
"AttachedPolicies"
]
for policy in policies:
self.iam_client.detach_role_policy(
RoleName=role_name, PolicyArn=policy["PolicyArn"]
)
print(f"All managed policies detached from role '{role_name}'.")

# Delete inline policies associated with the role
inline_policies = self.iam_client.list_role_policies(RoleName=role_name)[
"PolicyNames"
]
for policy_name in inline_policies:
self.iam_client.delete_role_policy(
RoleName=role_name, PolicyName=policy_name
)
print(f"All inline policies deleted from role '{role_name}'.")

# Finally, delete the IAM role
self.iam_client.delete_role(RoleName=role_name)
print(f"Role '{role_name}' deleted.")

except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchEntity":
print(f"Role '{role_name}' does not exist.")
else:
print(f"An error occurred: {e}")

def create_iam_role(
self,
role_name,
trust_policy_json,
inline_policy_json,
policy_name=None,
):
"""
Create a new IAM role with specified trust and inline policies.

:param role_name: Name of the IAM role to create.
:param trust_policy_json: Trust policy document in JSON format.
:param inline_policy_json: Inline policy document in JSON format.
:param policy_name: Optional. If not provided, a unique one will be generated.
:return: ARN of the created role or None if creation failed.
"""
try:
# Create the role with the provided trust policy
create_role_response = self.iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy_json),
Description="Role with custom trust and inline policies",
)

# Retrieve the ARN of the newly created role
role_arn = create_role_response["Role"]["Arn"]

# If policy_name is not provided, generate a unique one
if not policy_name:
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
policy_name = f"InlinePolicy-{role_name}-{timestamp}"

# Attach the inline policy to the role
self.iam_client.put_role_policy(
RoleName=role_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(inline_policy_json),
)

print(f"Created role: {role_name} with inline policy: {policy_name}")
return role_arn

except ClientError as e:
print(f"Error creating the role: {e}")
return None

def get_role_info(self, role_name, include_details=False):
"""
Retrieve information about an IAM role.

:param role_name: Name of the IAM role.
:param include_details: If False, returns only the role's ARN.
If True, returns a dictionary with full role details.
:return: ARN or dict of role details. Returns None if not found.
"""
if not role_name:
return None

try:
response = self.iam_client.get_role(RoleName=role_name)
role = response["Role"]
role_arn = role["Arn"]

if not include_details:
return role_arn

# Build a detailed dictionary
role_details = {
"RoleName": role["RoleName"],
"RoleId": role["RoleId"],
"Arn": role_arn,
"CreationDate": role["CreateDate"],
"AssumeRolePolicyDocument": role["AssumeRolePolicyDocument"],
"InlinePolicies": {},
}

# List and retrieve any inline policies
list_role_policies_response = self.iam_client.list_role_policies(
RoleName=role_name
)
for policy_name in list_role_policies_response["PolicyNames"]:
get_role_policy_response = self.iam_client.get_role_policy(
RoleName=role_name, PolicyName=policy_name
)
role_details["InlinePolicies"][policy_name] = get_role_policy_response[
"PolicyDocument"
]

return role_details

except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchEntity":
print(f"Role '{role_name}' does not exist.")
else:
print(f"An error occurred: {e}")
return None

def get_user_arn(self, username):
"""
Retrieve the ARN of an IAM user.

:param username: Name of the IAM user.
:return: ARN of the user or None if not found.
"""
if not username:
return None
try:
response = self.iam_client.get_user(UserName=username)
return response["User"]["Arn"]
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchEntity":
print(f"IAM user '{username}' not found.")
else:
print(f"An error occurred: {e}")
return None

def assume_role(self, role_arn, role_session_name=None, session=None):
"""
Assume an IAM role and obtain temporary security credentials.

:param role_arn: ARN of the IAM role to assume.
:param role_session_name: Identifier for the assumed role session.
:param session: Optional boto3 session object. Defaults to the class-level sts_client.
:return: Dictionary with temporary security credentials and metadata, or None on failure.
"""
if not role_arn:
logging.error("Role ARN is required.")
return None

sts_client = session.client("sts") if session else self.sts_client

role_session_name = role_session_name or f"session-{uuid.uuid4()}"

try:
assumed_role_object = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=role_session_name,
)

temp_credentials = assumed_role_object["Credentials"]
expiration = temp_credentials["Expiration"]

logging.info(
f"Assumed role: {role_arn}. Temporary credentials valid until: {expiration}"
)

return {
"credentials": {
"AccessKeyId": temp_credentials["AccessKeyId"],
"SecretAccessKey": temp_credentials["SecretAccessKey"],
"SessionToken": temp_credentials["SessionToken"],
},
"expiration": expiration,
"session_name": role_session_name,
}

except ClientError as e:
error_code = e.response["Error"]["Code"]
logging.error(f"Error assuming role {role_arn}: {error_code} - {e}")
return None

def get_iam_user_name_from_arn(self, iam_principal_arn):
hmumtazz marked this conversation as resolved.
Show resolved Hide resolved
"""
Extract the IAM user name from an IAM principal ARN.

:param iam_principal_arn: ARN of the IAM principal. Expected format: arn:aws:iam::<account-id>:user/<user-name>
:return: IAM user name if extraction is successful, None otherwise.
"""
try:
if (
iam_principal_arn
and iam_principal_arn.startswith("arn:aws:iam::")
and ":user/" in iam_principal_arn
):
return iam_principal_arn.split(":user/")[-1]
except Exception as e:
print(f"Error extracting IAM user name: {e}")
return None
112 changes: 112 additions & 0 deletions opensearch_py_ml/ml_commons/SecretsHelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# SPDX-License-Identifier: Apache-2.0
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Any modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

import json
import logging

import boto3
from botocore.exceptions import ClientError

# Configure the logger for this module
logger = logging.getLogger(__name__)


class SecretHelper:
"""
Helper class for managing secrets in AWS Secrets Manager.
Provides methods to check existence, retrieve details, and create secrets.
"""

def __init__(self, region: str):
"""
Initialize the SecretHelper with the specified AWS region.
:param region: AWS region where the Secrets Manager is located.
"""
self.region = region
# Create the Secrets Manager client once at the class level
self.secretsmanager = boto3.client("secretsmanager", region_name=self.region)

def secret_exists(self, secret_name: str) -> bool:
"""
Check if a secret with the given name exists in AWS Secrets Manager.
:param secret_name: Name of the secret to check.
:return: True if the secret exists, False otherwise.
"""
try:
# Attempt to retrieve the secret value
self.secretsmanager.get_secret_value(SecretId=secret_name)
return True
except ClientError as e:
# If the secret does not exist, return False
if e.response["Error"]["Code"] == "ResourceNotFoundException":
return False
else:
# Log other client errors and return False
logger.error(f"An error occurred: {e}")
return False

def get_secret_details(self, secret_name: str, fetch_value: bool = False) -> dict:
"""
Retrieve details of a secret from AWS Secrets Manager.
Optionally fetch the secret value as well.

:param secret_name: Name of the secret.
:param fetch_value: Whether to also fetch the secret value (default is False).
:return: A dictionary with secret details (ARN and optionally the secret value)
or an error dictionary if something went wrong.
"""
try:
# Describe the secret to get its ARN and metadata
describe_response = self.secretsmanager.describe_secret(
SecretId=secret_name
)

secret_details = {
"ARN": describe_response["ARN"],
# You can add more fields from `describe_response` if needed
}

# Fetch the secret value if requested
if fetch_value:
value_response = self.secretsmanager.get_secret_value(
SecretId=secret_name
)
secret_details["SecretValue"] = value_response.get("SecretString")

return secret_details

except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "ResourceNotFoundException":
logger.warning(f"The requested secret '{secret_name}' was not found")
else:
logger.error(
f"An error occurred while fetching secret '{secret_name}': {e}"
)
# Return a dictionary with error details
return {"error": str(e), "error_code": error_code}

def create_secret(self, secret_name: str, secret_value: dict) -> str:
"""
Create a new secret in AWS Secrets Manager.
:param secret_name: Name of the secret to create.
:param secret_value: Dictionary containing the secret data.
:return: ARN of the created secret if successful, None otherwise.
"""
try:
# Create the secret with the provided name and value
response = self.secretsmanager.create_secret(
Name=secret_name,
SecretString=json.dumps(secret_value),
)
# Log success and return the secret's ARN
logger.info(f"Secret '{secret_name}' created successfully.")
return response["ARN"]
except ClientError as e:
# Log errors during secret creation and return None
logger.error(f"Error creating secret '{secret_name}': {e}")
return None
Loading