Skip to content

Commit 36664ee

Browse files
redshift user
1 parent d6aae56 commit 36664ee

8 files changed

Lines changed: 931 additions & 38 deletions

structured-kb-demo/arns.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
import boto3
2-
from botocore.exceptions import ClientError
3-
4-
from logger import logger
51
from vars import (
62
AWS_ACCOUNT_ID,
73
S3_BUCKET,
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import boto3
2+
3+
from vars import AWS_REGION, BEDROCK_KB
4+
5+
bedrock_agent = boto3.client("bedrock-agent", region_name=AWS_REGION)
6+
7+
def get_kb_id_by_name(kb_name):
8+
response = bedrock_agent.list_knowledge_bases(maxResults=10)
9+
for kb in response.get('knowledgeBaseSummaries', []):
10+
if kb['name'] == kb_name:
11+
return kb['knowledgeBaseId']
12+
return None
13+
14+
BEDROCK_KB_ID = get_kb_id_by_name(BEDROCK_KB)

structured-kb-demo/get_redshift_wg_arn.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
import boto3
2+
from botocore.exceptions import ClientError
3+
4+
from logger import logger
5+
from vars import AWS_REGION, REDSHIFT_WORKGROUP
6+
17
try:
28
client = boto3.client("redshift-serverless", region_name=AWS_REGION)
39
response = client.get_workgroup(workgroupName=REDSHIFT_WORKGROUP)

structured-kb-demo/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@ readme = "README.md"
66
requires-python = ">=3.13"
77
dependencies = [
88
"boto3>=1.42.97",
9+
"langchain-aws>=1.4.5",
10+
"langgraph>=1.1.10",
911
"python-dotenv>=1.2.2",
1012
]
Lines changed: 92 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,103 @@
1+
import time
12
import boto3
3+
from botocore.exceptions import ClientError
24

3-
from arns import BEDROCK_KB_IAM_ROLE_ARN, REDSHIFT_WORKGROUP_ARN
5+
from arns import BEDROCK_KB_IAM_ROLE_ARN
6+
from get_redshift_wg_arn import REDSHIFT_WORKGROUP_ARN
47
from logger import logger
58
from vars import AWS_REGION, BEDROCK_KB, GLUE_DB, S3_FOLDER
69

7-
GLUE_TABLE = S3_FOLDER
10+
GLUE_TABLE_FULL = f"{GLUE_DB}.{S3_FOLDER}"
811

912
bedrock = boto3.client("bedrock-agent", region_name=AWS_REGION)
1013

11-
bedrock.create_knowledge_base(
12-
name=BEDROCK_KB,
13-
roleArn=BEDROCK_KB_IAM_ROLE_ARN,
14-
knowledgeBaseConfiguration={
15-
"type": "STRUCTURED",
16-
"sqlKnowledgeBaseConfiguration": {
17-
"type": "REDSHIFT",
18-
"redshiftConfiguration": {
19-
"queryEngineConfiguration": {
20-
"type": "SERVERLESS",
21-
"serverlessConfiguration": {
22-
"workgroupArn": REDSHIFT_WORKGROUP_ARN,
23-
"authConfiguration": {
24-
"type": "IAM"
25-
}
14+
def setup_structured_kb():
15+
try:
16+
logger.info(f"Creating Knowledge Base: {BEDROCK_KB}...")
17+
kb_response = bedrock.create_knowledge_base(
18+
name=BEDROCK_KB,
19+
roleArn=BEDROCK_KB_IAM_ROLE_ARN,
20+
knowledgeBaseConfiguration={
21+
"type": "SQL",
22+
"sqlKnowledgeBaseConfiguration": {
23+
"type": "REDSHIFT",
24+
"redshiftConfiguration": {
25+
"queryEngineConfiguration": {
26+
"type": "SERVERLESS",
27+
"serverlessConfiguration": {
28+
"workgroupArn": REDSHIFT_WORKGROUP_ARN,
29+
"authConfiguration": {"type": "IAM"}
30+
}
31+
},
32+
"storageConfigurations": [
33+
{
34+
"type": "AWS_DATA_CATALOG",
35+
"awsDataCatalogConfiguration": {
36+
"tableNames": [GLUE_TABLE_FULL]
37+
}
38+
}
39+
]
2640
}
27-
},
28-
"storageConfigurations": [
29-
{
30-
"type": "AWS_DATA_CATALOG",
31-
"awsDataCatalogConfiguration": {
32-
"tableNames": [f"{GLUE_DB}.{GLUE_TABLE}"]
33-
}
34-
}
35-
]
41+
}
42+
}
43+
)
44+
kb_id = kb_response['knowledgeBase']['knowledgeBaseId']
45+
logger.info(f"Successfully created KB with ID: {kb_id}")
46+
47+
except ClientError as e:
48+
if e.response['Error']['Code'] == 'ConflictException':
49+
logger.info(f"KB {BEDROCK_KB} already exists. Fetching ID...")
50+
# Logic to find existing ID
51+
kbs = bedrock.list_knowledge_bases(maxResults=100)['knowledgeBaseSummaries']
52+
kb_id = next(kb['knowledgeBaseId'] for kb in kbs if kb['name'] == BEDROCK_KB)
53+
else:
54+
raise e
55+
56+
try:
57+
logger.info("Connecting Redshift Metadata Data Source...")
58+
ds_response = bedrock.create_data_source(
59+
knowledgeBaseId=kb_id,
60+
name=f"{BEDROCK_KB}-metadata-source",
61+
dataSourceConfiguration={
62+
"type": "REDSHIFT_METADATA"
3663
}
37-
}
38-
}
39-
)
64+
)
65+
ds_id = ds_response['dataSource']['dataSourceId']
66+
logger.info(f"Data Source Created: {ds_id}")
67+
68+
except ClientError as e:
69+
if e.response['Error']['Code'] == 'ConflictException':
70+
logger.info("Data Source already exists. Fetching ID...")
71+
sources = bedrock.list_data_sources(knowledgeBaseId=kb_id)['dataSourceSummaries']
72+
ds_id = sources[0]['dataSourceId']
73+
else:
74+
raise e
75+
76+
# TRIGGER SYNC (INGESTION)
77+
logger.info("Starting Metadata Ingestion Job (Sync)...")
78+
ingest_response = bedrock.start_ingestion_job(
79+
knowledgeBaseId=kb_id,
80+
dataSourceId=ds_id
81+
)
82+
job_id = ingest_response['ingestionJob']['ingestionJobId']
83+
84+
# WAIT FOR SYNC
85+
while True:
86+
job = bedrock.get_ingestion_job(
87+
knowledgeBaseId=kb_id,
88+
dataSourceId=ds_id,
89+
ingestionJobId=job_id
90+
)
91+
status = job['ingestionJob']['status']
92+
logger.info(f"Sync Status: {status}")
93+
94+
if status in ['COMPLETE', 'FAILED', 'STOPPED']:
95+
if status == 'FAILED':
96+
logger.error(f"Sync failed. Reasons: {job['ingestionJob'].get('failureReasons')}")
97+
break
98+
time.sleep(10)
99+
100+
logger.info("Knowledge Base is now fully ready for SQL queries.")
40101

41-
logger.info(f"KB Created: {BEDROCK_KB} with Redshift Serverless as data source.")
102+
if __name__ == "__main__":
103+
setup_structured_kb()

structured-kb-demo/setup_bedrock_kb_iam_policy.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import boto3
22
import json
33

4+
from arns import S3_BUCKET_ARN
5+
from get_redshift_wg_arn import REDSHIFT_WORKGROUP_ARN
46
from logger import logger
5-
from vars import AWS_ACCOUNT_ID, BEDROCK_KB_IAM_POLICY
7+
from vars import AWS_ACCOUNT_ID, AWS_REGION, BEDROCK_KB_IAM_POLICY, GLUE_DB, S3_BUCKET, S3_FOLDER
68

9+
GLUE_TABLE = S3_FOLDER
710
iam = boto3.client("iam")
811

912
# Define the policy document
@@ -32,15 +35,15 @@
3235
"redshift-data:ExecuteStatement"
3336
],
3437
"Resource": [
35-
f"arn:aws:redshift-serverless:us-east-1:{AWS_ACCOUNT_ID}:workgroup/*"
38+
REDSHIFT_WORKGROUP_ARN
3639
]
3740
},
3841
{
3942
"Sid": "RedshiftServerlessGetCredentials",
4043
"Effect": "Allow",
4144
"Action": "redshift-serverless:GetCredentials",
4245
"Resource": [
43-
f"arn:aws:redshift-serverless:us-east-1:{AWS_ACCOUNT_ID}:workgroup/*"
46+
REDSHIFT_WORKGROUP_ARN
4447
]
4548
},
4649
{
@@ -61,6 +64,35 @@
6164
"bedrock:GenerateQuery"
6265
],
6366
"Resource": "*"
67+
},
68+
{
69+
"Sid": "VisualEditor0",
70+
"Effect": "Allow",
71+
"Action": [
72+
"glue:GetDatabases",
73+
"glue:GetDatabase",
74+
"glue:GetTables",
75+
"glue:GetTable",
76+
"glue:GetPartitions",
77+
"glue:GetPartition",
78+
"glue:SearchTables"
79+
],
80+
"Resource": [
81+
f"arn:aws:glue:{AWS_REGION}:{AWS_ACCOUNT_ID}:table/{GLUE_DB}/{GLUE_TABLE}",
82+
f"arn:aws:glue:{AWS_REGION}:{AWS_ACCOUNT_ID}:database/{GLUE_DB}",
83+
f"arn:aws:glue:{AWS_REGION}:{AWS_ACCOUNT_ID}:catalog"
84+
]
85+
},
86+
{
87+
"Effect": "Allow",
88+
"Action": [
89+
"s3:ListBucket",
90+
"s3:GetObject"
91+
],
92+
"Resource": [
93+
f"{S3_BUCKET_ARN}",
94+
f"{S3_BUCKET_ARN}/*"
95+
]
6496
}
6597
]
6698
}
@@ -76,6 +108,17 @@
76108
logger.info(f"Successfully created policy!")
77109

78110
except iam.exceptions.EntityAlreadyExistsException:
79-
logger.info(f"Policy already exists.")
111+
logger.info(f"Policy already exists, updating it.")
112+
# Get the policy ARN
113+
policies = iam.list_policies(Scope='Local')
114+
policy_arn = next(p['Arn'] for p in policies['Policies'] if p['PolicyName'] == BEDROCK_KB_IAM_POLICY)
115+
116+
# Create a new version and set it as default
117+
iam.create_policy_version(
118+
PolicyArn=policy_arn,
119+
PolicyDocument=json.dumps(policy_document),
120+
SetAsDefault=True
121+
)
122+
logger.info(f"Successfully updated policy!")
80123
except Exception as e:
81124
logger.error(f"An error occurred: {e}")
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import time
2+
3+
import boto3
4+
5+
from logger import logger
6+
from vars import AWS_REGION, BEDROCK_KB_IAM_ROLE, GLUE_DB, REDSHIFT_WORKGROUP
7+
8+
redshift_user = f"IAMR:{BEDROCK_KB_IAM_ROLE}"
9+
10+
# Initialize Redshift Data Client
11+
client = boto3.client("redshift-data", region_name=AWS_REGION)
12+
13+
# Define the SQL commands
14+
sql_statements = [
15+
f'CREATE USER "{redshift_user}" WITH PASSWORD DISABLE;',
16+
f'GRANT USAGE ON DATABASE awsdatacatalog TO "IAMR:StructKbIamRole";',
17+
]
18+
19+
try:
20+
# Execute as a batch
21+
response = client.batch_execute_statement(
22+
WorkgroupName=REDSHIFT_WORKGROUP,
23+
Database="awsdatacatalog",
24+
Sqls=sql_statements
25+
)
26+
27+
execution_id = response['Id']
28+
logger.info(f"Execution started. ID: {execution_id}")
29+
30+
# (Optional) Wait for completion
31+
while True:
32+
status = client.describe_statement(Id=execution_id)
33+
state = status['Status']
34+
if state in ['FINISHED', 'FAILED', 'ABORTED']:
35+
logger.info(f"Final Status: {state}")
36+
if state == 'FAILED':
37+
logger.error(f"Error: {status.get('Error')}")
38+
break
39+
time.sleep(2)
40+
41+
except Exception as e:
42+
logger.error(f"Failed to execute: {e}")

0 commit comments

Comments
 (0)