AWSTemplateFormatVersion: "2010-09-09"
Description: "Aurora DSQL Schema Analyzer - AgentCore with Knowledge Base and Lambda Functions"
Parameters:
# Database Configuration
SourceSecretArn:
Type: String
Description: ARN of Secrets Manager secret for source database credentials
# Network Configuration
LambdaVpcId:
Type: AWS::EC2::VPC::Id
Description: VPC ID for Lambda functions
LambdaSubnetId:
Type: AWS::EC2::Subnet::Id
Description: Subnet ID for Lambda functions
LambdaSecurityGroupId:
Type: AWS::EC2::SecurityGroup::Id
Description: Security Group ID for Lambda functions
# Lambda Layers
PyMySQLLayerArn:
Type: String
Default: ""
Description: ARN of PyMySQL Lambda Layer - Required only for MySQL/MariaDB source
Psycopg2LayerArn:
Type: String
Default: ""
Description: ARN of Psycopg2 Lambda Layer - Required only for PostgreSQL source
# Knowledge Base Configuration
OpenSearchCollectionArn:
Type: String
Description: ARN of OpenSearch Serverless Collection
OpenSearchIndexName:
Type: String
Description: Name of OpenSearch Index
Default: bedrock-knowledge-base-default-index
KnowledgeBaseRoleArn:
Type: String
Description: ARN of Knowledge Base IAM Role
# Model Configuration
FoundationModelId:
Type: String
Description: Foundation model ID or inference profile ARN (e.g., anthropic.claude-3-haiku-20240307-v1:0 or arn:aws:bedrock:us-east-1:123456789012:inference-profile/us.anthropic.claude-haiku-4-5-20251001-v1:0)
Default: us.anthropic.claude-haiku-4-5-20251001-v1:0
EmbeddingModelId:
Type: String
Description: Embedding model ID for the Knowledge Base
Default: amazon.titan-embed-text-v2:0
# AgentCore Configuration
AgentName:
Type: String
Default: "dsqlanalyzer"
Description: "Name for the AgentCore agent"
AllowedPattern: "^[a-zA-Z][a-zA-Z0-9_]{0,47}$"
ImageTag:
Type: String
Default: "latest"
Description: "Tag for the Docker image"
NetworkMode:
Type: String
Default: "PUBLIC"
Description: "Network mode for AgentCore resources"
AllowedValues:
- PUBLIC
- VPC
# AgentCore VPC Parameters (required for VPC mode)
AgentCoreVpcId:
Type: String
Default: ''
Description: (Optional) VPC ID for AgentCore - required if NetworkMode is VPC
AgentCoreSubnetIds:
Type: CommaDelimitedList
Default: ''
Description: (Optional) Subnet IDs for AgentCore (comma-delimited) - required if NetworkMode is VPC
AgentCoreSecurityGroupId:
Type: String
Default: ''
Description: (Optional) Security group for AgentCore - required if NetworkMode is VPC
Conditions:
IsFullArn: !Equals [!Select [0, !Split [":", !Ref FoundationModelId]], "arn"]
IsInferenceProfile: !Or
- !Equals [!Select [0, !Split [".", !Ref FoundationModelId]], "us"]
- !Equals [!Select [0, !Split [".", !Ref FoundationModelId]], "eu"]
HasPyMySQLLayer: !Not [!Equals [!Ref PyMySQLLayerArn, ""]]
HasPsycopg2Layer: !Not [!Equals [!Ref Psycopg2LayerArn, ""]]
UsePrivateNetwork: !Equals [!Ref NetworkMode, 'VPC']
Resources:
# SECURITY SECTION
EncryptionKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypting logs, CodeBuild, and ECR
KeyPolicy:
Version: '2012-10-17'
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: Allow Service Usage
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action:
- 'kms:Encrypt'
- 'kms:Decrypt'
- 'kms:ReEncrypt*'
- 'kms:GenerateDataKey*'
- 'kms:CreateGrant'
- 'kms:DescribeKey'
Resource: '*'
Condition:
StringEquals:
'kms:ViaService':
- !Sub 'secretsmanager.${AWS::Region}.amazonaws.com'
- !Sub 'kinesis.${AWS::Region}.amazonaws.com'
- !Sub 'logs.${AWS::Region}.amazonaws.com'
- !Sub 'sqs.${AWS::Region}.amazonaws.com'
- !Sub 'ecr.${AWS::Region}.amazonaws.com'
- Sid: Allow CloudWatch Logs
Effect: Allow
Principal:
Service: !Sub 'logs.${AWS::Region}.amazonaws.com'
Action:
- 'kms:Encrypt'
- 'kms:Decrypt'
- 'kms:ReEncrypt*'
- 'kms:GenerateDataKey*'
- 'kms:CreateGrant'
- 'kms:DescribeKey'
Resource: '*'
Condition:
ArnLike:
'kms:EncryptionContext:aws:logs:arn': !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*'
- Sid: Allow CodeBuild
Effect: Allow
Principal:
Service: codebuild.amazonaws.com
Action:
- 'kms:Encrypt'
- 'kms:Decrypt'
- 'kms:ReEncrypt*'
- 'kms:GenerateDataKey*'
- 'kms:CreateGrant'
- 'kms:DescribeKey'
Resource: '*'
EncryptionKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: !Sub 'alias/${AWS::StackName}-encryption-key'
TargetKeyId: !Ref EncryptionKey
# KNOWLEDGE BASE SECTION
DSQLKnowledgeBase:
Type: AWS::Bedrock::KnowledgeBase
Properties:
Name: !Sub ${AWS::StackName}-knowledge-base
Description: Aurora DSQL specific syntax recommendations and supported datatypes
RoleArn: !Ref KnowledgeBaseRoleArn
KnowledgeBaseConfiguration:
Type: VECTOR
VectorKnowledgeBaseConfiguration:
EmbeddingModelArn: !Sub arn:aws:bedrock:${AWS::Region}::foundation-model/${EmbeddingModelId}
EmbeddingModelConfiguration:
BedrockEmbeddingModelConfiguration:
Dimensions: 1024
EmbeddingDataType: FLOAT32
StorageConfiguration:
Type: OPENSEARCH_SERVERLESS
OpensearchServerlessConfiguration:
CollectionArn: !Ref OpenSearchCollectionArn
VectorIndexName: !Ref OpenSearchIndexName
FieldMapping:
VectorField: bedrock-knowledge-base-default-vector
TextField: AMAZON_BEDROCK_TEXT
MetadataField: AMAZON_BEDROCK_METADATA
DSQLDataSource:
Type: AWS::Bedrock::DataSource
Properties:
Name: !Sub ${AWS::StackName}-datasource
KnowledgeBaseId: !Ref DSQLKnowledgeBase
DataSourceConfiguration:
Type: WEB
WebConfiguration:
SourceConfiguration:
UrlConfiguration:
SeedUrls:
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/working-with-postgresql-compatibility-supported-data-types.html
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/working-with-postgresql-compatibility-supported-sql-features.html
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/working-with-postgresql-compatibility-supported-sql-subsets.html
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/working-with-postgresql-compatibility-unsupported-features.html
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/working-with-create-index-async.html
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/sequence-functions-syntax-support.html
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/create-sequence-syntax-support.html
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/sequences-identity-columns-overview.html
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/sequences-identity-columns-working-with.html
- Url: https://docs.aws.amazon.com/aurora-dsql/latest/userguide/working-with-postgresql-compatibility-migration-guide.html
CrawlerConfiguration:
CrawlerLimits:
RateLimit: 300
DataDeletionPolicy: DELETE
# LAMBDA SECTION - DDL Extraction Functions
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub ${AWS::StackName}-LambdaRole
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- !Ref SecretsManagerAccessPolicy
- !Ref KMSAccessPolicy
MySQLAnalysisFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub ${AWS::StackName}-mysql-analysis-dsql
Runtime: python3.14
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 900
MemorySize: 128
Environment:
Variables:
SOURCE_SECRET_ARN: !Ref SourceSecretArn
VpcConfig:
SecurityGroupIds:
- !Ref LambdaSecurityGroupId
SubnetIds:
- !Ref LambdaSubnetId
ReservedConcurrentExecutions: 10
Layers: !If
- HasPyMySQLLayer
- [!Ref PyMySQLLayerArn]
- !Ref AWS::NoValue
Code:
ZipFile: |
# Placeholder - Replace with actual MySQL DDL extraction code
import json
def lambda_handler(event, context):
return {
'statusCode': 200,
'body': json.dumps({'message': 'MySQL analysis function'})
}
PostgreSQLAnalysisFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub ${AWS::StackName}-postgresql-analysis-dsql
Runtime: python3.14
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 900
MemorySize: 128
Environment:
Variables:
POSTGRESQL_SECRET_ARN: !Ref SourceSecretArn
VpcConfig:
SecurityGroupIds:
- !Ref LambdaSecurityGroupId
SubnetIds:
- !Ref LambdaSubnetId
ReservedConcurrentExecutions: 10
Layers: !If
- HasPsycopg2Layer
- [!Ref Psycopg2LayerArn]
- !Ref AWS::NoValue
Code:
ZipFile: |
# Placeholder - Replace with actual PostgreSQL DDL extraction code
import json
def lambda_handler(event, context):
return {
'statusCode': 200,
'body': json.dumps({'message': 'PostgreSQL analysis function'})
}
# ECR SECTION
ECRRepository:
Type: AWS::ECR::Repository
DeletionPolicy: Delete
UpdateReplacePolicy: Delete
Properties:
RepositoryName: !Sub "${AWS::StackName}-dsql-analyzer"
ImageTagMutability: IMMUTABLE
EncryptionConfiguration:
EncryptionType: KMS
KmsKey: !GetAtt EncryptionKey.Arn
EmptyOnDelete: true
ImageScanningConfiguration:
ScanOnPush: true
# IAM MANAGED POLICIES
SecretsManagerAccessPolicy:
Type: AWS::IAM::ManagedPolicy
Properties:
Description: Allow Lambda to read source database secrets
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: secretsmanager:GetSecretValue
Resource: !Ref SourceSecretArn
KMSAccessPolicy:
Type: AWS::IAM::ManagedPolicy
Properties:
Description: Allow Lambda to use KMS encryption
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: !GetAtt EncryptionKey.Arn
CodeBuildExecutionPolicy:
Type: AWS::IAM::ManagedPolicy
Properties:
Description: Execution policy for CodeBuild
PolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: CloudWatchLogs
Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/codebuild/${AWS::StackName}-*"
- Sid: ECRAccess
Effect: Allow
Action:
- ecr:BatchCheckLayerAvailability
- ecr:GetDownloadUrlForLayer
- ecr:BatchGetImage
- ecr:GetAuthorizationToken
- ecr:PutImage
- ecr:InitiateLayerUpload
- ecr:UploadLayerPart
- ecr:CompleteLayerUpload
Resource:
- !GetAtt ECRRepository.Arn
- "*"
CustomResourceExecutionPolicy:
Type: AWS::IAM::ManagedPolicy
Properties:
Description: Execution policy for custom resource Lambda
PolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: CodeBuildAccess
Effect: Allow
Action:
- codebuild:StartBuild
- codebuild:BatchGetBuilds
Resource: !GetAtt AgentImageBuildProject.Arn
- Sid: KMSAccess
Effect: Allow
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: !GetAtt EncryptionKey.Arn
# IAM SECTION - AgentCore Roles
AgentExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "${AWS::StackName}-agent-execution-role"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: bedrock-agentcore.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
aws:SourceAccount: !Ref AWS::AccountId
ArnLike:
aws:SourceArn: !Sub "arn:aws:bedrock-agentcore:${AWS::Region}:${AWS::AccountId}:*"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/BedrockAgentCoreFullAccess
Policies:
- PolicyName: AgentCoreExecutionPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: ECRImageAccess
Effect: Allow
Action:
- ecr:BatchGetImage
- ecr:GetDownloadUrlForLayer
- ecr:BatchCheckLayerAvailability
- ecr:GetAuthorizationToken
Resource:
- !GetAtt ECRRepository.Arn
- "*"
- Sid: CloudWatchLogs
Effect: Allow
Action:
- logs:DescribeLogStreams
- logs:CreateLogGroup
- logs:DescribeLogGroups
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/*"
- Sid: BedrockModelInvocation
Effect: Allow
Action:
- bedrock:InvokeModel
- bedrock:InvokeModelWithResponseStream
- bedrock:Retrieve
- bedrock:RetrieveAndGenerate
Resource:
- !If
- IsFullArn
- !Ref FoundationModelId
- !If
- IsInferenceProfile
- !Sub 'arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:inference-profile/${FoundationModelId}'
- !Sub 'arn:aws:bedrock:${AWS::Region}::foundation-model/${FoundationModelId}'
- !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:knowledge-base/*"
- Sid: LambdaInvocation
Effect: Allow
Action: lambda:InvokeFunction
Resource:
- !GetAtt MySQLAnalysisFunction.Arn
- !GetAtt PostgreSQLAnalysisFunction.Arn
- Sid: KnowledgeBaseAccess
Effect: Allow
Action:
- bedrock:Retrieve
- bedrock:RetrieveAndGenerate
Resource: !Sub "arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:knowledge-base/${DSQLKnowledgeBase}"
- Sid: OpenSearchAccess
Effect: Allow
Action: aoss:APIAccessAll
Resource: !Ref OpenSearchCollectionArn
CodeBuildRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "${AWS::StackName}-codebuild-role"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: codebuild.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- !Ref CodeBuildExecutionPolicy
CustomResourceRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "${AWS::StackName}-custom-resource-role"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- !Ref CustomResourceExecutionPolicy
# LAMBDA SECTION - Custom Resources
CodeBuildTriggerFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub "${AWS::StackName}-codebuild-trigger"
Handler: index.handler
Role: !GetAtt CustomResourceRole.Arn
Runtime: python3.14
Timeout: 900
ReservedConcurrentExecutions: 5
Code:
ZipFile: |
import boto3, json, logging, time
import cfnresponse
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
logger.info('Received event: %s', json.dumps(event))
try:
if event['RequestType'] == 'Delete':
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
return
project_name = event['ResourceProperties']['ProjectName']
codebuild = boto3.client('codebuild')
response = codebuild.start_build(projectName=project_name)
build_id = response['build']['id']
logger.info(f"Started build: {build_id}")
max_wait_time = context.get_remaining_time_in_millis() / 1000 - 30
start_time = time.time()
while True:
if time.time() - start_time > max_wait_time:
cfnresponse.send(event, context, cfnresponse.FAILED, {'Error': 'Build timeout'})
return
build_response = codebuild.batch_get_builds(ids=[build_id])
build_status = build_response['builds'][0]['buildStatus']
if build_status == 'SUCCEEDED':
cfnresponse.send(event, context, cfnresponse.SUCCESS, {'BuildId': build_id})
return
elif build_status in ['FAILED', 'FAULT', 'STOPPED', 'TIMED_OUT']:
cfnresponse.send(event, context, cfnresponse.FAILED, {'Error': f'Build failed: {build_status}'})
return
time.sleep(30)
except Exception as e:
logger.error('Error: %s', str(e))
cfnresponse.send(event, context, cfnresponse.FAILED, {'Error': str(e)})
# CODEBUILD SECTION - Container Image Building
AgentImageBuildProject:
Type: AWS::CodeBuild::Project
Properties:
Name: !Sub "${AWS::StackName}-dsql-analyzer-build"
ServiceRole: !GetAtt CodeBuildRole.Arn
EncryptionKey: !GetAtt EncryptionKey.Arn
Artifacts:
Type: NO_ARTIFACTS
Environment:
Type: ARM_CONTAINER
ComputeType: BUILD_GENERAL1_LARGE
Image: aws/codebuild/amazonlinux2-aarch64-standard:3.0
PrivilegedMode: true
EnvironmentVariables:
- Name: AWS_DEFAULT_REGION
Value: !Ref AWS::Region
- Name: AWS_ACCOUNT_ID
Value: !Ref AWS::AccountId
- Name: IMAGE_REPO_NAME
Value: !Ref ECRRepository
- Name: IMAGE_TAG
Value: !Ref ImageTag
- Name: KNOWLEDGE_BASE_ID
Value: !Ref DSQLKnowledgeBase
- Name: MYSQL_LAMBDA_ARN
Value: !GetAtt MySQLAnalysisFunction.Arn
- Name: POSTGRESQL_LAMBDA_ARN
Value: !GetAtt PostgreSQLAnalysisFunction.Arn
- Name: FOUNDATION_MODEL_ID
Value: !If
- IsFullArn
- !Ref FoundationModelId
- !If
- IsInferenceProfile
- !Sub 'arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:inference-profile/${FoundationModelId}'
- !Sub 'arn:aws:bedrock:${AWS::Region}::foundation-model/${FoundationModelId}'
Source:
Type: NO_SOURCE
BuildSpec: !Sub |
version: 0.2
phases:
pre_build:
commands:
- echo Logging in to Amazon ECR...
- aws ecr get-login-password --region $AWS_DEFAULT_REGION | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com
build:
commands:
- echo Building DSQL Analyzer Agent...
- |
cat > requirements.txt << 'EOF'
strands-agents
strands-agents-tools
boto3
bedrock-agentcore
bedrock-agentcore-starter-toolkit
pydantic
inputimeout
python-dotenv
EOF
- |
cat > dsql_agent.py << 'AGENTEOF'
from strands.telemetry import StrandsTelemetry
from strands.types.content import Message
from strands.models import BedrockModel
from strands.agent.conversation_manager import SlidingWindowConversationManager
from strands import Agent, tool
import json
import sys
import os
import re
import io
import uuid
import asyncio
from typing import Union, Optional, Annotated, Dict, List, Any, Literal
from inputimeout import inputimeout, TimeoutOccurred
from pydantic import BaseModel, Field
import boto3
from dotenv import load_dotenv
from bedrock_agentcore.runtime.context import RequestContext
from bedrock_agentcore import BedrockAgentCoreApp
load_dotenv()
app = BedrockAgentCoreApp()
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Environment variables
KNOWLEDGE_BASE_ID = os.getenv('KNOWLEDGE_BASE_ID')
MYSQL_LAMBDA_ARN = os.getenv('MYSQL_LAMBDA_ARN')
POSTGRESQL_LAMBDA_ARN = os.getenv('POSTGRESQL_LAMBDA_ARN')
REGION = os.getenv('AWS_REGION', 'us-east-1')
MODEL_ID = os.getenv('FOUNDATION_MODEL_ID')
llm_ORCHESTRATION = BedrockModel(
model_id=MODEL_ID,
region_name=REGION,
temperature=0,
max_tokens=55000,
stop_sequences=[''],
top_k=50
)
ORCHESTRATION_TEMPLATE = """
$You are an Aurora DSQL compatibility analysis agent that analyzes MySQL/MariaDB or PostgreSQL databases for Aurora DSQL compatibility.
Critical Workflow:
1. ALWAYS Extract Real DDL First:
- When users request database analysis, immediately call the appropriate action group:
- For MySQL/MariaDB analysis: Use action-group-mysql action group
- For PostgreSQL analysis: Use action_group_postgresql action group
- These functions connect to live databases and extract actual DDL statements.
IMPORTANT: Database credentials are already configured via AWS Secrets Manager in the functions environment variable. Users do NOT need to provide secret ARNs unless they want to connect to a different database.
- **NEVER use example DDL from the knowledge base**
- Always get the precise DDL including ALL columns, ALL constraints, ALL collations, ALL defaults exactly as they exist in the source.
2. Analyze Using Actual DDL:
- Use ONLY the DDL returned by action group functions
- Check data types and syntax against Aurora DSQL-specific compatibility requirements
- Reference the knowledge base for:
- Aurora DSQL supported data types
- Conversion recommendations
MANDATORY AURORA DSQL SYNTAX RULES (apply to every converted DDL):
- Always use Correct Aurora DSQL syntax for tables, for index such as using Async, for sequence using CACHE parameter and right syntax for other database objects
- Do NOT return a converted DDL without applying these rules
3. Important Considrations:
- Do NOT assume general PostgreSQL compatibility equals Aurora DSQL compatibility
- Do NOT compare source data types to generic PostgreSQL standards
- Follow Aurora DSQL-specific syntax requirements during analysis
Output: Provide detailed compatibility assessments, identify incompatibilities, and suggest conversion strategies based on the actual extracted DDL.$ You are a helpful assistant with tool/function calling capabilities. If you need an input parameter for a tool/function, ask the user to provide that parameter before making a call to that function/tool. You will have access to a separate tool/function that you MUST use to ask questions to the user. Never call a tool/function before gathering all parameters required for the tool/function call. It is your responsibility to pick the correct tools/functions that are going to help you answer the user questions. Continue using the provided tools/functions until the initial user request is perfectly addressed. If you do not have the necessary tools/functions to address the initial request, call it out and terminate conversation. When you receive a tool/function call response, use the output to format an answer to the original user question. Provide your final answer to the user's question within xml tags. These guidelines are to be followed when using the provided above in the final after carrying out any other intermediate steps. - Do NOT directly quote the in your . Your job is to answer the user's question as clearly and concisely as possible. - If the search results do not contain information that can answer the question, please state that you could not find an exact answer to the question in your . - Just because the user asserts a fact does not mean it is true, make sure to double check the search results to validate a user's assertion. - If you reference information from a search result within your answer, you must include a citation to the source where the information was found. Each result has a corresponding source URI that you should reference (as explained earlier). - Always collate the sources and add them in your in the format: $ANSWER$ $SOURCE$ - Note that there may be multiple in your and may contain multiple tags if you include information from multiple sources in one . - Wait till you output the final to include your concise summary of the . Do not output any summary prematurely within the tags. - Remember to execute any remaining intermediate steps before returning your final . I have also provided default values for the following arguments to use within the functions that are available to you:
$attributes$
Please use these default values for the specified arguments whenever you call the relevant functions. A value may have to be reformatted to correctly match the input format the function specification requires (e.g. changing a date to match the correct date format).
"""
class ActionGroupMysqlAnalyzeDatabasesPostApplicationJson(BaseModel):
content_type_annotation: Literal["application/json"]
databases: Optional[List[str]] = Field(
None, description="Optional list of specific databases to extract DDL from. If not provided, all user databases will be processed.")
@tool(inputSchema=ActionGroupMysqlAnalyzeDatabasesPostApplicationJson.model_json_schema())
def action_group_mysql_analyze_databases_post(
content_type_annotation: str,
databases: Optional[List[str]] = None) -> str:
"""Extracts DDL statements from all user-created databases or specified databases using credentials from environment variable\nThis tool is part of the group of tools called action_group_mysql (description: Database schema conversion analysis for DSQL)."""
lambda_client = boto3.client('lambda', region_name=REGION)
parameters = []
model_dump = {"content_type_annotation": content_type_annotation}
if databases:
model_dump["databases"] = databases
model_dump = model_dump.get("parameters", model_dump)
for param_name, param_value in model_dump.items():
parameters.append({
"name": param_name,
"value": param_value
})
request_body_dump = model_dump.get("request_body", model_dump)
content_type = request_body_dump.get("content_type_annotation", "*") if request_body_dump else None
request_body = {"content": {content_type: {"properties": []}}}
for param_name, param_value in request_body_dump.items():
if param_name != "content_type_annotation":
request_body["content"][content_type]["properties"].append({
"name": param_name,
"value": param_value
})
try:
payload = {
"messageVersion": "1.0",
"agent": {
"name": "dsql-analyzer-agent",
"id": "AGENTCORE",
"alias": "LATEST",
"version": "DRAFT"
},
"sessionId": "",
"sessionAttributes": {},
"promptSessionAttributes": {},
"actionGroup": "action_group_mysql",
"apiPath": "/analyze-databases",
"inputText": last_input,
"httpMethod": "POST",
"parameters": parameters
}
if request_body:
payload["requestBody"] = request_body
response = lambda_client.invoke(
FunctionName=MYSQL_LAMBDA_ARN,
InvocationType='RequestResponse',
Payload=json.dumps(payload)
)
response_payload = json.loads(response['Payload'].read().decode('utf-8'))
return str(response_payload)
except Exception as e:
return f"Error executing analyze_databases/post: {str(e)}"
class ActionGroupPostgresqlAnalyzePostgresqlPostApplicationJson(BaseModel):
content_type_annotation: Literal["application/json"]
database_name: Optional[str] = Field(
None,
description="Optional specific database name to analyze. If not provided, uses the default database from the secret")
specific_tables: Optional[List[str]] = Field(
None, description="Optional list of specific table names to extract. If not provided, extracts all tables")
@tool(inputSchema=ActionGroupPostgresqlAnalyzePostgresqlPostApplicationJson.model_json_schema())
def action_group_postgresql_analyze_postgresql_post(
content_type_annotation: str,
database_name: Optional[str] = None,
specific_tables: Optional[List[str]] = None) -> str:
"""Connects to PostgreSQL database and extracts complete DDL statements for all database objects including tables, indexes, sequences, functions, procedures, triggers, and views\nThis tool is part of the group of tools called action_group_postgresql (description: PostgreSQL database schema analysis and extraction)."""
lambda_client = boto3.client('lambda', region_name=REGION)
parameters = []
model_dump = {"content_type_annotation": content_type_annotation}
if database_name:
model_dump["database_name"] = database_name
if specific_tables:
model_dump["specific_tables"] = specific_tables
model_dump = model_dump.get("parameters", model_dump)
for param_name, param_value in model_dump.items():
parameters.append({
"name": param_name,
"value": param_value
})
request_body_dump = model_dump.get("request_body", model_dump)
content_type = request_body_dump.get("content_type_annotation", "*") if request_body_dump else None
request_body = {"content": {content_type: {"properties": []}}}
for param_name, param_value in request_body_dump.items():
if param_name != "content_type_annotation":
request_body["content"][content_type]["properties"].append({
"name": param_name,
"value": param_value
})
try:
payload = {
"messageVersion": "1.0",
"agent": {
"name": "dsql-analyzer-agent",
"id": "AGENTCORE",
"alias": "LATEST",
"version": "DRAFT"
},
"sessionId": "",
"sessionAttributes": {},
"promptSessionAttributes": {},
"actionGroup": "action_group_postgresql",
"apiPath": "/analyze-postgresql",
"inputText": last_input,
"httpMethod": "POST",
"parameters": parameters
}
if request_body:
payload["requestBody"] = request_body
response = lambda_client.invoke(
FunctionName=POSTGRESQL_LAMBDA_ARN,
InvocationType='RequestResponse',
Payload=json.dumps(payload)
)
response_payload = json.loads(response['Payload'].read().decode('utf-8'))
return str(response_payload)
except Exception as e:
return f"Error executing analyze_postgresql/post: {str(e)}"
action_group_tools = [action_group_mysql_analyze_databases_post, action_group_postgresql_analyze_postgresql_post]
checkpointer_STM = SlidingWindowConversationManager()
@tool
def retrieve_bedrock_agent_knowledge_base(query: str):
"""This is a knowledge base with the following description: Aurora DSQL specific syntax recommendations and supported datatypes . Invoke it with a query to get relevant results."""
client = boto3.client("bedrock-agent-runtime", region_name=REGION)
return client.retrieve(
retrievalQuery={"text": query},
knowledgeBaseId=KNOWLEDGE_BASE_ID,
retrievalConfiguration={
"vectorSearchConfiguration": {"numberOfResults": 10},
},
).get('retrievalResults', [])
tools = [retrieve_bedrock_agent_knowledge_base]
tools_used = set()
strands_telemetry = StrandsTelemetry()
strands_telemetry.setup_meter(enable_console_exporter=True)
strands_telemetry.setup_console_exporter()
tools += action_group_tools
def make_msg(role, text):
return {
"role": role,
"content": [{"text": text}]
}
def inference(model, messages, system_prompt=""):
async def run_inference():
results = []
async for event in model.stream(messages=messages, system_prompt=system_prompt):
results.append(event)
return results
response = asyncio.run(run_inference())
text = ""
for chunk in response:
if not "contentBlockDelta" in chunk:
continue
text += chunk["contentBlockDelta"].get("delta", {}).get("text", "")
return text
_agent = None
first_turn = True
last_input = ""
user_id = ""
def get_agent():
global _agent
if _agent is None:
system_prompt = ORCHESTRATION_TEMPLATE
_agent = Agent(
model=llm_ORCHESTRATION,
system_prompt=system_prompt,
tools=tools,
conversation_manager=checkpointer_STM
)
return _agent
def invoke_agent(question: str):
global last_input
last_input = question
agent = get_agent()
original_stdout = sys.stdout
sys.stdout = io.StringIO()
response = agent(question)
sys.stdout = original_stdout
return response
@app.entrypoint
def endpoint(payload, context):
try:
session_id = getattr(context, 'session_id', None) or payload.get("sessionId") or uuid.uuid4().hex[:8]
tools_used.clear()
agent_query = payload.get("message", "")
if not agent_query:
return {'error': "No query provided, please provide a 'message' field in the payload."}
agent_result = invoke_agent(agent_query)
tools_used.update(list(agent_result.metrics.tool_metrics.keys()))
response_content = str(agent_result)
# Gathering sources from the response
sources = []
urls = re.findall('(?:https?://|www\\.)(?:[a-zA-Z0-9-]+\\.)+[a-zA-Z]{2,}(?:/[^/\\s]*)*', response_content)
source_tags = re.findall(r"(.*?)", response_content)
sources.extend(urls)
sources.extend(source_tags)
sources = list(set(sources))
formatted_messages = [
(agent_query, "USER"), (response_content if response_content else "No Response.", "ASSISTANT")]
return {'result': {'response': response_content, 'sources': sources, 'tools_used': list(
tools_used), 'sessionId': session_id, 'messages': formatted_messages}}
except Exception as e:
return {'error': str(e)}
def cli():
global user_id
user_id = "8fe6d7f3"
session_id = uuid.uuid4().hex[:8].lower()
try:
while True:
try:
query = inputimeout("\nEnter your question (or 'exit' to quit): ", timeout=600)
if query.lower() == "exit":
break
result = endpoint({"message": query}, RequestContext(session_id=session_id)).get('result', {})
if not result:
print(" Error:" + str(result.get('error', {})))
continue
print(f"\nResponse: {result.get('response', 'No response provided')}")
if result["sources"]:
print(f" Sources: {', '.join(set(result.get('sources', [])))}")
if result["tools_used"]:
tools_used.update(result.get('tools_used', []))
print(f"\n Tools Used: {', '.join(tools_used)}")
tools_used.clear()
except KeyboardInterrupt:
print("\n\nExiting...")
break
except TimeoutOccurred:
print("\n\nNo input received in the last 0 seconds. Exiting...")
break
except Exception as e:
print("\n\nError: {}".format(e))
finally:
print("Session ended.")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--cli":
cli()
else:
app.run()
AGENTEOF
- |
cat > Dockerfile << 'EOF'
FROM public.ecr.aws/docker/library/python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
RUN useradd -m -u 1000 bedrock_agentcore
USER bedrock_agentcore
EXPOSE 8080
COPY dsql_agent.py .
CMD ["python", "-m", "dsql_agent"]
EOF
- docker build -t $IMAGE_REPO_NAME:$IMAGE_TAG .
- docker tag $IMAGE_REPO_NAME:$IMAGE_TAG $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME:$IMAGE_TAG
post_build:
commands:
- docker push $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME:$IMAGE_TAG
TriggerImageBuild:
Type: Custom::CodeBuildTrigger
DependsOn:
- ECRRepository
- AgentImageBuildProject
- CodeBuildTriggerFunction
- DSQLKnowledgeBase
- MySQLAnalysisFunction
- PostgreSQLAnalysisFunction
Properties:
ServiceToken: !GetAtt CodeBuildTriggerFunction.Arn
ProjectName: !Ref AgentImageBuildProject
# AGENTCORE SECTION
AgentRuntime:
Type: AWS::BedrockAgentCore::Runtime
DependsOn: TriggerImageBuild
Properties:
AgentRuntimeName: !Sub
- "${StackNameUnderscore}_${AgentName}"
- StackNameUnderscore: !Join ["_", !Split ["-", !Ref "AWS::StackName"]]
AgentRuntimeArtifact:
ContainerConfiguration:
ContainerUri: !Sub "${ECRRepository.RepositoryUri}:${ImageTag}"
RoleArn: !GetAtt AgentExecutionRole.Arn
NetworkConfiguration:
NetworkMode: !Ref NetworkMode
VpcConfiguration: !If
- UsePrivateNetwork
- SecurityGroupIds:
- !Ref AgentCoreSecurityGroupId
SubnetIds: !Ref AgentCoreSubnetIds
- !Ref 'AWS::NoValue'
Description: !Sub "Aurora DSQL Schema Analyzer Agent for ${AWS::StackName}"
EnvironmentVariables:
KNOWLEDGE_BASE_ID: !Ref DSQLKnowledgeBase
MYSQL_LAMBDA_ARN: !GetAtt MySQLAnalysisFunction.Arn
POSTGRESQL_LAMBDA_ARN: !GetAtt PostgreSQLAnalysisFunction.Arn
AWS_REGION: !Ref AWS::Region
FOUNDATION_MODEL_ID: !If
- IsFullArn
- !Ref FoundationModelId
- !If
- IsInferenceProfile
- !Sub 'arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:inference-profile/${FoundationModelId}'
- !Sub 'arn:aws:bedrock:${AWS::Region}::foundation-model/${FoundationModelId}'
# OBSERVABILITY SECTION
BedrockAgentCoreLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/vendedlogs/bedrock-agentcore/${AgentRuntime.AgentRuntimeId}'
RetentionInDays: 14
KmsKeyId: !GetAtt EncryptionKey.Arn
LogsDeliverySource:
Type: AWS::Logs::DeliverySource
Properties:
Name: !Sub '${AgentRuntime.AgentRuntimeId}-logs-source'
LogType: 'APPLICATION_LOGS'
ResourceArn: !GetAtt AgentRuntime.AgentRuntimeArn
DependsOn: AgentRuntime
LogsDeliveryDestination:
Type: AWS::Logs::DeliveryDestination
Properties:
Name: !Sub '${AgentRuntime.AgentRuntimeId}-logs-destination'
DeliveryDestinationType: 'CWL'
DestinationResourceArn: !GetAtt BedrockAgentCoreLogGroup.Arn
LogsDelivery:
Type: AWS::Logs::Delivery
Properties:
DeliverySourceName: !Ref LogsDeliverySource
DeliveryDestinationArn: !GetAtt LogsDeliveryDestination.Arn
DependsOn:
- LogsDeliverySource
- LogsDeliveryDestination
# OUTPUTS SECTION
Outputs:
AgentRuntimeId:
Description: "ID of the DSQL Analyzer AgentCore Runtime"
Value: !GetAtt AgentRuntime.AgentRuntimeId
Export:
Name: !Sub "${AWS::StackName}-AgentRuntimeId"
AgentRuntimeArn:
Description: "ARN of the DSQL Analyzer AgentCore Runtime"
Value: !GetAtt AgentRuntime.AgentRuntimeArn
Export:
Name: !Sub "${AWS::StackName}-AgentRuntimeArn"
KnowledgeBaseId:
Description: "ID of the Aurora DSQL Knowledge Base"
Value: !Ref DSQLKnowledgeBase
Export:
Name: !Sub "${AWS::StackName}-KnowledgeBaseId"
MySQLLambdaArn:
Description: "ARN of MySQL Analysis Lambda Function"
Value: !GetAtt MySQLAnalysisFunction.Arn
Export:
Name: !Sub "${AWS::StackName}-MySQLLambdaArn"
PostgreSQLLambdaArn:
Description: "ARN of PostgreSQL Analysis Lambda Function"
Value: !GetAtt PostgreSQLAnalysisFunction.Arn
Export:
Name: !Sub "${AWS::StackName}-PostgreSQLLambdaArn"
ECRRepositoryUri:
Description: "URI of the ECR Repository"
Value: !GetAtt ECRRepository.RepositoryUri
Export:
Name: !Sub "${AWS::StackName}-ECRRepositoryUri"
InvokeCommand:
Description: "Command to invoke the agent"
Value: !Sub |
agentcore invoke --agent-runtime-id ${AgentRuntime.AgentRuntimeId} --message "Analyze techmart_db for Aurora DSQL compatibility"