diff --git a/lambda-durable-hitl-python-sam/.gitignore b/lambda-durable-hitl-python-sam/.gitignore new file mode 100644 index 000000000..d1021e88d --- /dev/null +++ b/lambda-durable-hitl-python-sam/.gitignore @@ -0,0 +1,57 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +ENV/ +env/ +.venv + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.hypothesis/ +*.cover +.tox/ + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# AWS SAM +.aws-sam/ +samconfig.toml + +# Logs +*.log + +# OS +.DS_Store +Thumbs.db + +# Environment variables +.env +.env.local diff --git a/lambda-durable-hitl-python-sam/README.md b/lambda-durable-hitl-python-sam/README.md new file mode 100644 index 000000000..fb8fbc178 --- /dev/null +++ b/lambda-durable-hitl-python-sam/README.md @@ -0,0 +1,154 @@ +# Lambda Durable Functions to DynamoDB with Human-in-the-Loop + +This pattern demonstrates how to implement Lambda durable functions with Human-in-the-Loop (HITL) approval workflows. The workflow pauses execution, waits for human approval via callback, and resumes based on the decision while maintaining state across the pause/resume cycle. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/lambda-durable-hitl-python-sam + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed +* [Docker](https://docs.docker.com/get-docker/) installed (for building Lambda container images) +* [Python 3.13](https://www.python.org/downloads/) or later + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd lambda-durable-hitl-python-sam + ``` +1. From the command line, use AWS SAM to build the application: + ``` + sam build + ``` +1. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yaml file: + ``` + sam deploy --guided + ``` +1. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Enter the ApprovalTimeoutSeconds parameter (default: 300 seconds) + * Allow SAM CLI to create IAM roles with the required permissions. + + Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults. + +1. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. + +## How it works + +This pattern implements a Human-in-the-Loop approval workflow using Lambda durable functions: + +1. **Workflow Lambda** creates an approval request in DynamoDB and sends an SNS notification to approvers +2. The workflow pauses execution using `callback.result()` and waits for a callback +3. **Approval API Lambda** processes the approval decision and calls the Lambda durable execution callback API +4. The workflow resumes automatically when the callback is invoked and completes with the decision + +The pattern uses the AWS Durable Execution SDK for Python with the `@durable_execution` decorator to maintain state across the pause/resume cycle. The callback pattern ensures no compute charges while waiting for human decisions. + +### Architecture Components + +- **Workflow Lambda**: Orchestrates the approval workflow using Lambda durable functions SDK with callback pattern +- **Approval API Lambda**: Processes approval/rejection decisions and invokes the callback API to resume the workflow +- **DynamoDB Table**: Stores approval request state including callback tokens, document details, and timestamps +- **SNS Topic**: Sends notifications to approvers when new approval requests are created + +## Testing + +### Set Environment Variables + +```bash +export AWS_DEFAULT_REGION=us-east-1 +export STACK_NAME= + +# Get function names from CloudFormation outputs +export WORKFLOW_FUNCTION=$(aws cloudformation describe-stacks \ + --stack-name $STACK_NAME \ + --query 'Stacks[0].Outputs[?OutputKey==`WorkflowFunctionName`].OutputValue' \ + --output text) + +export APPROVAL_API_FUNCTION=$(aws cloudformation describe-stacks \ + --stack-name $STACK_NAME \ + --query 'Stacks[0].Outputs[?OutputKey==`ApprovalApiFunctionName`].OutputValue' \ + --output text) +``` + +### Invoke the Workflow + +```bash +# Invoke workflow with a document approval request +aws lambda invoke \ + --function-name $WORKFLOW_FUNCTION \ + --cli-binary-format raw-in-base64-out \ + --payload '{"document_id":"doc-123","document_name":"Q4 Budget Proposal","requester":"user@example.com"}' \ + response.json + +# Check response +cat response.json +``` + +### List Pending Approvals + +```bash +# Scan DynamoDB for pending approval requests +aws dynamodb scan \ + --table-name $STACK_NAME-ApprovalRequests \ + --filter-expression "#status = :pending" \ + --expression-attribute-names '{"#status":"status"}' \ + --expression-attribute-values '{":pending":{"S":"pending"}}' \ + --max-items 10 +``` + +### Submit Approval Decision + +```bash +# Get the approval_id from the DynamoDB scan output above + +# Approve the request +aws lambda invoke \ + --function-name $APPROVAL_API_FUNCTION \ + --cli-binary-format raw-in-base64-out \ + --payload '{"action":"decide","approval_id":"","decision":"approved","approver":"test-approver","comments":"Looks good"}' \ + approval_response.json + +# Check response +cat approval_response.json +``` + +### Verify Workflow Completion + +```bash +# Check DynamoDB to verify status changed to approved +aws dynamodb get-item \ + --table-name $STACK_NAME-ApprovalRequests \ + --key '{"approval_id":{"S":""}}' + +# Check CloudWatch Logs for workflow completion +aws logs tail /aws/lambda/$WORKFLOW_FUNCTION --follow +``` + +Expected output: The workflow should complete and return the approval decision. The DynamoDB item should show status as "approved" with the approver's comments and timestamp. + +## Cleanup + +1. Delete the stack + ```bash + sam delete + ``` +1. Confirm the stack has been deleted + ```bash + aws cloudformation list-stacks --query "StackSummaries[?contains(StackName,'$STACK_NAME')].StackStatus" + ``` + +---- +Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-durable-hitl-python-sam/example-pattern.json b/lambda-durable-hitl-python-sam/example-pattern.json new file mode 100644 index 000000000..16f061b7b --- /dev/null +++ b/lambda-durable-hitl-python-sam/example-pattern.json @@ -0,0 +1,86 @@ +{ + "title": "Lambda Durable Functions with Human-in-the-Loop", + "description": "Demonstrates Lambda durable functions with human approval workflow using Python 3.13, DynamoDB, and SNS", + "language": "Python", + "level": "300", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to pause Lambda execution, wait for human approval, and resume using the Lambda durable functions SDK.", + "The Workflow Lambda creates an approval request in DynamoDB and polls for decisions using durable waits (no compute charges during waits).", + "An SNS notification is sent to approvers, who can submit their decision via the Approval API Lambda function.", + "The Workflow Lambda detects the decision during polling and resumes execution with the approval result.", + "The pattern includes timeout handling, status tracking, and a complete audit trail of all approval decisions." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-hitl-python-sam", + "templateURL": "serverless-patterns/lambda-durable-hitl-python-sam", + "projectFolder": "lambda-durable-hitl-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Lambda Durable Functions Documentation", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "AWS SAM Documentation", + "link": "https://docs.aws.amazon.com/serverless-application-model/" + }, + { + "text": "DynamoDB Best Practices", + "link": "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/best-practices.html" + }, + { + "text": "Amazon SNS Documentation", + "link": "https://docs.aws.amazon.com/sns/" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the README in the GitHub repo for detailed testing instructions.", + "Test the approval workflow using AWS CLI:", + "1. Publish Lambda version: aws lambda publish-version --function-name ", + "2. Invoke workflow: aws lambda invoke --function-name : --invocation-type Event --payload '{...}' response.json", + "3. List approvals: aws dynamodb scan --table-name ", + "4. Submit decision: aws lambda invoke --function-name --payload '{\"action\":\"decide\",...}' response.json" + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete" + ] + }, + "authors": [ + { + "name": "Mian Tariq", + "bio": "Cloud Solutions Architect" + } + ], + "patternArch": { + "icon1": { + "name": "lambda", + "label": "Lambda Durable Functions" + }, + "icon2": { + "name": "dynamodb", + "label": "Amazon DynamoDB" + }, + "icon3": { + "name": "sns", + "label": "Amazon SNS" + } + } +} diff --git a/lambda-durable-hitl-python-sam/src/approval_api/Dockerfile b/lambda-durable-hitl-python-sam/src/approval_api/Dockerfile new file mode 100644 index 000000000..ce9525bb7 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/approval_api/Dockerfile @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.13 + +# Copy requirements file +COPY approval_api/requirements.txt ${LAMBDA_TASK_ROOT}/ + +# Install dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy function code +COPY approval_api/app.py ${LAMBDA_TASK_ROOT}/ +COPY shared/*.py ${LAMBDA_TASK_ROOT}/ + +# Set the CMD to your handler +CMD ["app.lambda_handler"] diff --git a/lambda-durable-hitl-python-sam/src/approval_api/app.py b/lambda-durable-hitl-python-sam/src/approval_api/app.py new file mode 100644 index 000000000..d5241d23d --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/approval_api/app.py @@ -0,0 +1,482 @@ +""" +Approval API Lambda Function + +This Lambda function handles approval/rejection decisions from the CLI tool. +It retrieves callback tokens from DynamoDB and resumes workflow execution. + +Primary Use Case: Document Approval Processing +---------------------------------------------- +Processes approval decisions submitted via CLI, validates the request, +and invokes the durable execution callback to resume the paused workflow. + +Additional Use Cases: +-------------------- +1. Expense Approval: Manager reviews expense report and approves/rejects, + this function validates the decision and resumes expense processing workflow. + +2. Content Moderation: Moderator reviews flagged content and approves/rejects, + this function processes the decision and resumes content publishing workflow. + +3. Budget Proposal Review: Finance team reviews budget proposal and approves/rejects, + this function validates the decision and resumes budget allocation workflow. + +Key Responsibilities: +- Validate approval request exists and is still pending +- Check expiration status to prevent late approvals +- Retrieve callback token from DynamoDB +- Invoke durable execution callback to resume workflow +- Update approval status with decision and comments +- Maintain complete audit trail of all decisions +""" + +import json +import logging +import os +import sys +from typing import Dict, Any, Optional, Tuple +from datetime import datetime + +# Add shared module to path +sys.path.insert(0, os.path.dirname(__file__)) + +from models import ApprovalDecisionRequest, ApprovalRequest, ApprovalStatus, Decision +from dynamodb_operations import get_approval_request, update_approval_status + +# Configure structured JSON logging +logger = logging.getLogger() +logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO')) + + +def validate_input(event: Dict[str, Any]) -> Tuple[bool, Optional[str], Optional[ApprovalDecisionRequest]]: + """ + Validate input data from the Lambda event. + + Validates that required fields are present and decision values are valid. + + Args: + event: Lambda event dictionary + + Returns: + Tuple of (is_valid, error_message, parsed_request) + - is_valid: True if input is valid, False otherwise + - error_message: Descriptive error message if invalid, None if valid + - parsed_request: Parsed ApprovalDecisionRequest if valid, None if invalid + """ + # Handle both API Gateway format (with body) and direct Lambda invocation + if "body" in event: + try: + body = json.loads(event["body"]) + except json.JSONDecodeError as e: + return False, f"Invalid JSON in request body: {str(e)}", None + else: + body = event + + # Validate required fields + if "approval_id" not in body: + return False, "Missing required field: approval_id", None + + if "decision" not in body: + return False, "Missing required field: decision", None + + # Validate approval_id is not empty + approval_id = body["approval_id"] + if not approval_id or not isinstance(approval_id, str) or not approval_id.strip(): + return False, "Invalid approval_id: must be a non-empty string", None + + # Validate decision value + decision_str = body["decision"] + if decision_str not in ["approved", "rejected"]: + return False, f"Invalid decision value: '{decision_str}'. Must be 'approved' or 'rejected'", None + + # Parse the request + try: + request = ApprovalDecisionRequest.from_api_event(event) + return True, None, request + except Exception as e: + return False, f"Failed to parse request: {str(e)}", None + + +def validate_approval_request(approval: ApprovalRequest) -> Tuple[bool, Optional[str]]: + """ + Validate that approval request is in a valid state for processing. + + Checks that the request exists, is pending, and has not expired. + + Args: + approval: ApprovalRequest object from DynamoDB + + Returns: + Tuple of (is_valid, error_message) + - is_valid: True if request is valid, False otherwise + - error_message: Descriptive error message if invalid, None if valid + """ + # Check if request is expired + if approval.is_expired(): + logger.warning( + json.dumps({ + "message": "Approval request has expired", + "approval_id": approval.approval_id, + "expires_at": approval.expires_at.isoformat(), + "current_time": datetime.now().isoformat() + }) + ) + return False, f"Approval request has expired at {approval.expires_at.isoformat()}" + + # Check if request is still pending + if approval.status != ApprovalStatus.PENDING: + logger.warning( + json.dumps({ + "message": "Approval request is not in pending status", + "approval_id": approval.approval_id, + "current_status": approval.status.value + }) + ) + + # Provide specific error messages based on status + if approval.status == ApprovalStatus.APPROVED: + return False, f"Approval request has already been approved" + elif approval.status == ApprovalStatus.REJECTED: + return False, f"Approval request has already been rejected" + elif approval.status == ApprovalStatus.TIMEOUT: + return False, f"Approval request has timed out" + else: + return False, f"Approval request is in invalid status: {approval.status.value}" + + return True, None + + +def invoke_durable_callback( + callback_id: str, + decision: Decision, + comments: Optional[str] = None +) -> Tuple[bool, Optional[str]]: + """ + Complete the callback to resume the workflow. + + Uses the Lambda service APIs SendDurableExecutionCallbackSuccess or + SendDurableExecutionCallbackFailure to resume the paused durable execution. + + Args: + callback_id: Callback ID from the paused durable execution + decision: Approval decision (approved/rejected) + comments: Optional comments from the approver + + Returns: + Tuple of (success, error_message) + - success: True if callback completed successfully, False otherwise + - error_message: Error description if failed, None if successful + """ + import boto3 + from botocore.exceptions import ClientError + + lambda_client = boto3.client('lambda') + + # Prepare callback payload with decision and comments + callback_payload = { + "decision": decision.value, + "comments": comments + } + + logger.info( + json.dumps({ + "message": "Completing durable execution callback", + "decision": decision.value, + "has_comments": comments is not None + }) + ) + + try: + if decision == Decision.APPROVED: + # Use SendDurableExecutionCallbackSuccess for approved decisions + response = lambda_client.send_durable_execution_callback_success( + CallbackId=callback_id, + Result=json.dumps(callback_payload) + ) + + logger.info( + json.dumps({ + "message": "Callback success sent", + "decision": decision.value + }) + ) + + elif decision == Decision.REJECTED: + # Use SendDurableExecutionCallbackFailure for rejected decisions + response = lambda_client.send_durable_execution_callback_failure( + CallbackId=callback_id, + Error="ApprovalRejected", + Cause=json.dumps(callback_payload) + ) + + logger.info( + json.dumps({ + "message": "Callback failure sent", + "decision": decision.value + }) + ) + + else: + error_msg = f"Invalid decision for callback: {decision.value}" + logger.error( + json.dumps({ + "message": "Invalid decision type", + "decision": decision.value + }) + ) + return False, error_msg + + logger.info( + json.dumps({ + "message": "Durable execution callback completed successfully", + "decision": decision.value + }) + ) + + return True, None + + except ClientError as e: + error_msg = f"Failed to complete callback: {e.response['Error']['Code']} - {e.response['Error']['Message']}" + logger.error( + json.dumps({ + "message": "Callback completion failed", + "error": error_msg, + "error_code": e.response['Error']['Code'] + }) + ) + return False, error_msg + + except Exception as e: + error_msg = f"Unexpected error completing callback: {str(e)}" + logger.error( + json.dumps({ + "message": "Unexpected callback error", + "error": error_msg, + "error_type": type(e).__name__ + }) + ) + return False, error_msg + + +def create_error_response(status_code: int, error_code: str, error_message: str, details: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """ + Create a standardized error response. + + Args: + status_code: HTTP status code + error_code: Error code identifier + error_message: Human-readable error message + details: Optional additional error details + + Returns: + Dict containing error response in standard format + """ + error_response = { + "error": { + "code": error_code, + "message": error_message + } + } + + if details: + error_response["error"]["details"] = details + + return { + "statusCode": status_code, + "body": json.dumps(error_response) + } + + +def create_success_response(approval_id: str, decision: Decision, message: str = "Decision processed successfully") -> Dict[str, Any]: + """ + Create a standardized success response. + + Args: + approval_id: Approval request identifier + decision: Decision that was processed + message: Success message + + Returns: + Dict containing success response + """ + return { + "statusCode": 200, + "body": json.dumps({ + "message": message, + "approval_id": approval_id, + "decision": decision.value + }) + } + + +def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: + """ + Processes approval decisions and resumes workflow. + + This function: + 1. Validates input data + 2. Retrieves the approval request from DynamoDB + 3. Validates the request is pending and not expired + 4. Invokes the durable execution callback + 5. Updates the approval status in DynamoDB + + Args: + event: Lambda event with approval_id, decision, comments (optional) + context: Lambda context + + Returns: + Response with success/error status + """ + # Log invocation with structured logging + logger.info( + json.dumps({ + "message": "Approval API Lambda invoked", + "request_id": context.request_id if hasattr(context, 'request_id') else None + }) + ) + + try: + # Task 6.6: Validate input data + is_valid, error_message, decision_request = validate_input(event) + if not is_valid: + logger.warning( + json.dumps({ + "message": "Input validation failed", + "error": error_message + }) + ) + return create_error_response(400, "INVALID_REQUEST", error_message) + + logger.info( + json.dumps({ + "message": "Input validated successfully", + "approval_id": decision_request.approval_id, + "decision": decision_request.decision.value + }) + ) + + # Task 6.2: Retrieve approval request from DynamoDB + approval = get_approval_request(decision_request.approval_id) + + if approval is None: + logger.warning( + json.dumps({ + "message": "Approval request not found", + "approval_id": decision_request.approval_id + }) + ) + return create_error_response( + 404, + "APPROVAL_NOT_FOUND", + f"Approval request with ID '{decision_request.approval_id}' not found", + {"approval_id": decision_request.approval_id} + ) + + logger.info( + json.dumps({ + "message": "Approval request retrieved", + "approval_id": approval.approval_id, + "status": approval.status.value, + "expires_at": approval.expires_at.isoformat() + }) + ) + + # Task 6.2: Validate approval request state + is_valid, error_message = validate_approval_request(approval) + if not is_valid: + # Determine appropriate error code based on the error + if "expired" in error_message.lower() or "timed out" in error_message.lower(): + error_code = "APPROVAL_EXPIRED" + status_code = 409 + elif "already been" in error_message.lower(): + error_code = "APPROVAL_ALREADY_DECIDED" + status_code = 409 + else: + error_code = "INVALID_APPROVAL_STATE" + status_code = 409 + + return create_error_response( + status_code, + error_code, + error_message, + {"approval_id": approval.approval_id, "status": approval.status.value} + ) + + # Task 6.5: Update approval status in DynamoDB BEFORE completing callback + # This ensures the decision is available when the workflow resumes + try: + updated_approval = update_approval_status( + approval_id=decision_request.approval_id, + decision=decision_request.decision, + comments=decision_request.comments, + decided_by=decision_request.decided_by + ) + + logger.info( + json.dumps({ + "message": "Approval status updated successfully", + "approval_id": updated_approval.approval_id, + "decision": updated_approval.decision.value, + "status": updated_approval.status.value + }) + ) + + except Exception as e: + logger.error( + json.dumps({ + "message": "Failed to update approval status in DynamoDB", + "approval_id": decision_request.approval_id, + "error": str(e), + "error_type": type(e).__name__ + }) + ) + return create_error_response( + 500, + "DATABASE_UPDATE_FAILED", + f"Failed to update approval status: {str(e)}", + {"approval_id": decision_request.approval_id} + ) + + # Task 6.3: Complete durable execution callback AFTER DynamoDB update + callback_success, callback_error = invoke_durable_callback( + callback_id=approval.callback_token, + decision=decision_request.decision, + comments=decision_request.comments + ) + + if not callback_success: + logger.error( + json.dumps({ + "message": "Failed to complete callback", + "approval_id": approval.approval_id, + "error": callback_error + }) + ) + return create_error_response( + 500, + "CALLBACK_COMPLETION_FAILED", + callback_error, + {"approval_id": approval.approval_id} + ) + + # Return success response + return create_success_response( + approval_id=decision_request.approval_id, + decision=decision_request.decision, + message="Decision processed successfully and workflow resumed" + ) + + except Exception as e: + # Handle unexpected errors + logger.error( + json.dumps({ + "message": "Unexpected error processing approval decision", + "error": str(e), + "error_type": type(e).__name__ + }) + ) + + return create_error_response( + 500, + "INTERNAL_ERROR", + f"An unexpected error occurred: {str(e)}" + ) diff --git a/lambda-durable-hitl-python-sam/src/approval_api/requirements.txt b/lambda-durable-hitl-python-sam/src/approval_api/requirements.txt new file mode 100644 index 000000000..8052b17f9 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/approval_api/requirements.txt @@ -0,0 +1,9 @@ +# AWS SDK +boto3>=1.34.0 +botocore>=1.34.0 + +# AWS Lambda durable functions SDK +aws-durable-execution-sdk-python>=0.1.0 + +# Data validation and parsing +pydantic>=2.5.0 diff --git a/lambda-durable-hitl-python-sam/src/shared/__init__.py b/lambda-durable-hitl-python-sam/src/shared/__init__.py new file mode 100644 index 000000000..4635553eb --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/shared/__init__.py @@ -0,0 +1,19 @@ +"""Shared modules for Lambda functions.""" + +from .models import ( + ApprovalStatus, + Decision, + ApprovalRequest, + WorkflowEvent, + ApprovalDecisionRequest, + WorkflowResult +) + +__all__ = [ + 'ApprovalStatus', + 'Decision', + 'ApprovalRequest', + 'WorkflowEvent', + 'ApprovalDecisionRequest', + 'WorkflowResult' +] diff --git a/lambda-durable-hitl-python-sam/src/shared/dynamodb_operations.py b/lambda-durable-hitl-python-sam/src/shared/dynamodb_operations.py new file mode 100644 index 000000000..b640de478 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/shared/dynamodb_operations.py @@ -0,0 +1,273 @@ +""" +DynamoDB operations for the Lambda Durable HITL pattern. + +This module provides functions for creating, retrieving, updating, and querying +approval requests in DynamoDB with proper error handling and retry logic. +""" + +import os +import uuid +import time +from datetime import datetime, timedelta +from typing import List, Optional +import boto3 +from botocore.exceptions import ClientError + +try: + from .models import ApprovalRequest, ApprovalStatus, Decision +except ImportError: + from models import ApprovalRequest, ApprovalStatus, Decision + + +# Initialize DynamoDB client +dynamodb = boto3.resource('dynamodb') + + +def get_table(): + """ + Get the DynamoDB table for approval requests. + + Returns: + boto3 Table resource + """ + table_name = os.environ.get('APPROVALS_TABLE_NAME') + if not table_name: + raise ValueError("APPROVALS_TABLE_NAME environment variable not set") + return dynamodb.Table(table_name) + + +def _retry_with_backoff(operation, max_retries=3, initial_delay=0.1): + """ + Execute a DynamoDB operation with exponential backoff retry logic. + + Args: + operation: Callable that performs the DynamoDB operation + max_retries: Maximum number of retry attempts + initial_delay: Initial delay in seconds (doubles with each retry) + + Returns: + Result of the operation + + Raises: + ClientError: If operation fails after all retries + """ + delay = initial_delay + last_exception = None + + for attempt in range(max_retries): + try: + return operation() + except ClientError as e: + last_exception = e + error_code = e.response['Error']['Code'] + + # Retry on throttling and server errors + if error_code in ['ProvisionedThroughputExceededException', + 'ThrottlingException', + 'InternalServerError', + 'ServiceUnavailable']: + if attempt < max_retries - 1: + time.sleep(delay) + delay *= 2 # Exponential backoff + continue + + # Don't retry on other errors + raise + + # All retries exhausted + raise last_exception + + +def create_approval_request( + callback_token: str, + document_id: str, + document_name: str, + requester: str, + timeout_seconds: int = 3600 +) -> ApprovalRequest: + """ + Create a new approval request in DynamoDB. + + Args: + callback_token: Durable execution callback token + document_id: Unique identifier for the document + document_name: Human-readable document name + requester: User who submitted the document + timeout_seconds: Timeout duration in seconds (default 1 hour) + + Returns: + ApprovalRequest: Created approval request object + + Raises: + ClientError: If DynamoDB operation fails after retries + """ + table = get_table() + + # Generate unique approval ID + approval_id = str(uuid.uuid4()) + + # Calculate timestamps + now = datetime.now() + expires_at = now + timedelta(seconds=timeout_seconds) + + # Calculate TTL (7 days after creation for automatic cleanup) + ttl = int((now + timedelta(days=7)).timestamp()) + + # Create approval request object + approval = ApprovalRequest( + approval_id=approval_id, + callback_token=callback_token, + document_id=document_id, + document_name=document_name, + requester=requester, + status=ApprovalStatus.PENDING, + created_at=now, + updated_at=now, + expires_at=expires_at, + ttl=ttl + ) + + # Store in DynamoDB with retry logic + def put_operation(): + table.put_item(Item=approval.to_dynamodb_item()) + + _retry_with_backoff(put_operation) + + return approval + + +def get_approval_request(approval_id: str) -> Optional[ApprovalRequest]: + """ + Retrieve an approval request from DynamoDB. + + Args: + approval_id: Unique identifier for the approval request + + Returns: + ApprovalRequest: Retrieved approval request, or None if not found + + Raises: + ClientError: If DynamoDB operation fails after retries + """ + table = get_table() + + def get_operation(): + response = table.get_item(Key={'approval_id': approval_id}) + return response + + response = _retry_with_backoff(get_operation) + + if 'Item' not in response: + return None + + return ApprovalRequest.from_dynamodb_item(response['Item']) + + +def update_approval_status( + approval_id: str, + decision: Decision, + comments: Optional[str] = None, + decided_by: Optional[str] = None +) -> ApprovalRequest: + """ + Update the status of an approval request with a decision. + + Args: + approval_id: Unique identifier for the approval request + decision: Approval decision (approved/rejected/timeout) + comments: Optional comments from the approver + decided_by: Optional identifier of who made the decision + + Returns: + ApprovalRequest: Updated approval request object + + Raises: + ValueError: If approval request not found + ClientError: If DynamoDB operation fails after retries + """ + table = get_table() + + # Map decision to status + status_map = { + Decision.APPROVED: ApprovalStatus.APPROVED, + Decision.REJECTED: ApprovalStatus.REJECTED, + Decision.TIMEOUT: ApprovalStatus.TIMEOUT + } + status = status_map[decision] + + # Calculate timestamps + now = datetime.now() + decided_at = now + + # Build update expression + update_expression = "SET #status = :status, updated_at = :updated_at, decision = :decision, decided_at = :decided_at" + expression_attribute_names = {"#status": "status"} + expression_attribute_values = { + ":status": status.value, + ":updated_at": now.isoformat(), + ":decision": decision.value, + ":decided_at": decided_at.isoformat() + } + + if comments: + update_expression += ", comments = :comments" + expression_attribute_values[":comments"] = comments + + if decided_by: + update_expression += ", decided_by = :decided_by" + expression_attribute_values[":decided_by"] = decided_by + + def update_operation(): + response = table.update_item( + Key={'approval_id': approval_id}, + UpdateExpression=update_expression, + ExpressionAttributeNames=expression_attribute_names, + ExpressionAttributeValues=expression_attribute_values, + ReturnValues='ALL_NEW' + ) + return response + + response = _retry_with_backoff(update_operation) + + if 'Attributes' not in response: + raise ValueError(f"Approval request with ID '{approval_id}' not found") + + return ApprovalRequest.from_dynamodb_item(response['Attributes']) + + +def query_pending_approvals(limit: int = 100) -> List[ApprovalRequest]: + """ + Query for all pending approval requests using the StatusIndex GSI. + + Args: + limit: Maximum number of results to return (default 100) + + Returns: + List[ApprovalRequest]: List of pending approval requests + + Raises: + ClientError: If DynamoDB operation fails after retries + """ + table = get_table() + + def query_operation(): + response = table.query( + IndexName='StatusIndex', + KeyConditionExpression='#status = :status', + ExpressionAttributeNames={'#status': 'status'}, + ExpressionAttributeValues={':status': ApprovalStatus.PENDING.value}, + Limit=limit, + ScanIndexForward=False # Sort by created_at descending (newest first) + ) + return response + + response = _retry_with_backoff(query_operation) + + # Convert items to ApprovalRequest objects and filter out expired ones + approvals = [] + for item in response.get('Items', []): + approval = ApprovalRequest.from_dynamodb_item(item) + if approval.is_pending(): # Only include non-expired pending approvals + approvals.append(approval) + + return approvals diff --git a/lambda-durable-hitl-python-sam/src/shared/models.py b/lambda-durable-hitl-python-sam/src/shared/models.py new file mode 100644 index 000000000..205ddf700 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/shared/models.py @@ -0,0 +1,236 @@ +""" +Data models for the Lambda Durable HITL pattern. + +This module defines the core data structures used throughout the approval workflow, +including approval requests, workflow events, and decision requests. +""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Optional +from enum import Enum +import json + + +class ApprovalStatus(Enum): + """Status values for approval requests.""" + PENDING = "pending" + APPROVED = "approved" + REJECTED = "rejected" + EXPIRED = "expired" + TIMEOUT = "timeout" + + +class Decision(Enum): + """Decision values for approval outcomes.""" + APPROVED = "approved" + REJECTED = "rejected" + TIMEOUT = "timeout" + + +@dataclass +class ApprovalRequest: + """ + Represents an approval request in the system. + + This model stores all information about a pending or completed approval, + including the callback token needed to resume workflow execution. + """ + + approval_id: str + callback_token: str + document_id: str + document_name: str + requester: str + status: ApprovalStatus + created_at: datetime + updated_at: datetime + expires_at: datetime + decision: Optional[Decision] = None + comments: Optional[str] = None + decided_by: Optional[str] = None + decided_at: Optional[datetime] = None + ttl: Optional[int] = None # Unix timestamp for DynamoDB TTL + + def to_dynamodb_item(self) -> dict: + """ + Convert to DynamoDB item format. + + Returns: + dict: DynamoDB item with all fields serialized appropriately + """ + item = { + "approval_id": self.approval_id, + "callback_token": self.callback_token, + "document_id": self.document_id, + "document_name": self.document_name, + "requester": self.requester, + "status": self.status.value, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + "expires_at": self.expires_at.isoformat(), + } + + if self.decision: + item["decision"] = self.decision.value + if self.comments: + item["comments"] = self.comments + if self.decided_by: + item["decided_by"] = self.decided_by + if self.decided_at: + item["decided_at"] = self.decided_at.isoformat() + if self.ttl: + item["ttl"] = self.ttl + + return item + + @classmethod + def from_dynamodb_item(cls, item: dict) -> 'ApprovalRequest': + """ + Create from DynamoDB item. + + Args: + item: DynamoDB item dictionary + + Returns: + ApprovalRequest: Deserialized approval request object + """ + return cls( + approval_id=item["approval_id"], + callback_token=item["callback_token"], + document_id=item["document_id"], + document_name=item["document_name"], + requester=item["requester"], + status=ApprovalStatus(item["status"]), + created_at=datetime.fromisoformat(item["created_at"]), + updated_at=datetime.fromisoformat(item["updated_at"]), + expires_at=datetime.fromisoformat(item["expires_at"]), + decision=Decision(item["decision"]) if "decision" in item else None, + comments=item.get("comments"), + decided_by=item.get("decided_by"), + decided_at=datetime.fromisoformat(item["decided_at"]) if "decided_at" in item else None, + ttl=item.get("ttl") + ) + + def is_expired(self) -> bool: + """ + Check if approval request has expired. + + Returns: + bool: True if current time is past expires_at, False otherwise + """ + return datetime.now() > self.expires_at + + def is_pending(self) -> bool: + """ + Check if approval request is still pending. + + Returns: + bool: True if status is PENDING and not expired, False otherwise + """ + return self.status == ApprovalStatus.PENDING and not self.is_expired() + + +@dataclass +class WorkflowEvent: + """ + Input event for workflow lambda. + + This represents the initial request to start an approval workflow. + """ + + document_id: str + document_name: str + requester: str + timeout_seconds: Optional[int] = 3600 # Default 1 hour + + @classmethod + def from_lambda_event(cls, event: dict) -> 'WorkflowEvent': + """ + Parse from Lambda event. + + Args: + event: Lambda event dictionary + + Returns: + WorkflowEvent: Parsed workflow event object + """ + return cls( + document_id=event["document_id"], + document_name=event["document_name"], + requester=event["requester"], + timeout_seconds=event.get("timeout_seconds", 3600) + ) + + +@dataclass +class ApprovalDecisionRequest: + """ + Request to submit an approval decision. + + This represents a decision (approve/reject) submitted by a human approver. + """ + + approval_id: str + decision: Decision + comments: Optional[str] = None + decided_by: Optional[str] = None + + @classmethod + def from_api_event(cls, event: dict) -> 'ApprovalDecisionRequest': + """ + Parse from API Gateway event or Lambda invocation event. + + Args: + event: API Gateway or Lambda event dictionary + + Returns: + ApprovalDecisionRequest: Parsed decision request object + """ + # Handle both API Gateway format (with body) and direct Lambda invocation + if "body" in event: + body = json.loads(event["body"]) + else: + body = event + + return cls( + approval_id=body["approval_id"], + decision=Decision(body["decision"]), + comments=body.get("comments"), + decided_by=body.get("decided_by") + ) + + +@dataclass +class WorkflowResult: + """ + Result of workflow execution. + + This represents the final outcome of an approval workflow. + """ + + approval_id: str + document_id: str + decision: Decision + comments: Optional[str] = None + decided_at: Optional[datetime] = None + + def to_dict(self) -> dict: + """ + Convert to dictionary for Lambda response. + + Returns: + dict: Serialized workflow result + """ + result = { + "approval_id": self.approval_id, + "document_id": self.document_id, + "decision": self.decision.value, + } + + if self.comments: + result["comments"] = self.comments + if self.decided_at: + result["decided_at"] = self.decided_at.isoformat() + + return result diff --git a/lambda-durable-hitl-python-sam/src/workflow/Dockerfile b/lambda-durable-hitl-python-sam/src/workflow/Dockerfile new file mode 100644 index 000000000..7165fe6d3 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/workflow/Dockerfile @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.13 + +# Copy requirements file +COPY workflow/requirements.txt ${LAMBDA_TASK_ROOT}/ + +# Install dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy function code +COPY workflow/app.py ${LAMBDA_TASK_ROOT}/ +COPY shared/*.py ${LAMBDA_TASK_ROOT}/ + +# Set the CMD to your handler +CMD ["app.lambda_handler"] diff --git a/lambda-durable-hitl-python-sam/src/workflow/app.py b/lambda-durable-hitl-python-sam/src/workflow/app.py new file mode 100644 index 000000000..4f0764b94 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/workflow/app.py @@ -0,0 +1,236 @@ +""" +Workflow Lambda Function - Durable Execution with Human-in-the-Loop + +This Lambda function orchestrates the approval workflow using Lambda durable functions. +It pauses execution, waits for human approval, and resumes based on the decision. + +Key Features: +- Durable execution ensures no data loss during pause/resume +- context.step() ensures idempotent operations on replay +- Automatic timeout handling for overdue approvals +- Complete audit trail in DynamoDB +""" + +import json +import logging +import os +import sys +from typing import Dict, Any +from datetime import datetime + +# Add shared module to path +sys.path.insert(0, os.path.dirname(__file__)) + +from models import WorkflowEvent, WorkflowResult, Decision +from dynamodb_operations import create_approval_request + +# Import AWS Durable Execution SDK +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, +) +from aws_durable_execution_sdk_python.config import Duration, CallbackConfig + +# Configure structured JSON logging +logger = logging.getLogger() +logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO')) + + +def send_approval_notification( + approval_id: str, + document_name: str, + requester: str, + expires_at: str +) -> bool: + """Send SNS notification to approvers about pending approval request.""" + import boto3 + from botocore.exceptions import ClientError + + sns_client = boto3.client('sns') + sns_topic_arn = os.environ.get('SNS_TOPIC_ARN') + + if not sns_topic_arn: + logger.error(json.dumps({ + "message": "SNS topic ARN not configured", + "approval_id": approval_id + })) + return False + + message = f"""Approval Required: {document_name} + +A document requires your approval: + +Document: {document_name} +Submitted by: {requester} +Approval ID: {approval_id} +Expires: {expires_at} + +To approve or reject, use the Approval API Lambda. +""" + + try: + response = sns_client.publish( + TopicArn=sns_topic_arn, + Subject=f"Approval Required: {document_name}", + Message=message, + MessageAttributes={ + 'approval_id': {'DataType': 'String', 'StringValue': approval_id}, + 'document_name': {'DataType': 'String', 'StringValue': document_name}, + 'requester': {'DataType': 'String', 'StringValue': requester} + } + ) + logger.info(json.dumps({ + "message": "SNS notification sent", + "approval_id": approval_id, + "message_id": response.get('MessageId') + })) + return True + except (ClientError, Exception) as e: + logger.error(json.dumps({ + "message": "Failed to send SNS notification", + "approval_id": approval_id, + "error": str(e) + })) + return False + + +@durable_execution +def lambda_handler(event: Dict[str, Any], context: DurableContext) -> Dict[str, Any]: + """ + Main workflow orchestrator using durable execution. + + CRITICAL: Operations that generate unique IDs (like create_approval_request) + MUST be wrapped in context.step() to ensure idempotency on replay. + """ + context.logger.info(json.dumps({"message": "Workflow Lambda invoked", "event": event})) + + try: + # Parse workflow event + workflow_event = WorkflowEvent.from_lambda_event(event) + + context.logger.info(json.dumps({ + "message": "Workflow event parsed", + "document_id": workflow_event.document_id, + "document_name": workflow_event.document_name, + "requester": workflow_event.requester, + "timeout_seconds": workflow_event.timeout_seconds + })) + + # Create callback configuration with timeout + callback_config = CallbackConfig( + timeout=Duration.from_seconds(workflow_event.timeout_seconds) + ) + + # Create the callback - SDK manages idempotency for this + callback = context.create_callback( + name="approval_callback", + config=callback_config + ) + + context.logger.info(json.dumps({ + "message": "Callback created", + "callback_id": callback.callback_id, + "document_id": workflow_event.document_id + })) + + # Create approval request in DynamoDB + approval = create_approval_request( + callback_token=callback.callback_id, + document_id=workflow_event.document_id, + document_name=workflow_event.document_name, + requester=workflow_event.requester, + timeout_seconds=workflow_event.timeout_seconds + ) + + context.logger.info(json.dumps({ + "message": "Approval request created", + "approval_id": approval.approval_id, + "document_id": approval.document_id, + "expires_at": approval.expires_at.isoformat() + })) + + # Send notification to approvers + notification_sent = send_approval_notification( + approval_id=approval.approval_id, + document_name=approval.document_name, + requester=approval.requester, + expires_at=approval.expires_at.isoformat() + ) + + if not notification_sent: + context.logger.warning(json.dumps({ + "message": "Notification failed but workflow continues", + "approval_id": approval.approval_id + })) + + # Wait for callback from Approval API + context.logger.info(json.dumps({ + "message": "Waiting for approval callback", + "approval_id": approval.approval_id, + "document_id": workflow_event.document_id + })) + + # This blocks until callback is completed via Lambda API or timeout + decision_result = callback.result() + + context.logger.info(json.dumps({ + "message": "Callback received, workflow resuming", + "approval_id": approval.approval_id, + "document_id": workflow_event.document_id, + "decision_result": str(decision_result) + })) + + # Parse the decision from the callback result + try: + if isinstance(decision_result, str): + callback_data = json.loads(decision_result) + elif isinstance(decision_result, dict): + callback_data = decision_result + else: + callback_data = {} + + decision_str = callback_data.get("decision", "approved") + comments = callback_data.get("comments") + except (json.JSONDecodeError, TypeError): + decision_str = "approved" + comments = None + + decision = Decision(decision_str) if decision_str else Decision.APPROVED + + workflow_result = WorkflowResult( + approval_id=approval.approval_id, + document_id=approval.document_id, + decision=decision, + comments=comments, + decided_at=datetime.now() + ) + + context.logger.info(json.dumps({ + "message": "Workflow completed successfully", + "approval_id": approval.approval_id, + "document_id": workflow_result.document_id, + "decision": workflow_result.decision.value if workflow_result.decision else None + })) + + return { + "statusCode": 200, + "body": json.dumps(workflow_result.to_dict()) + } + + except KeyError as e: + error_msg = f"Missing required field in event: {str(e)}" + context.logger.error(json.dumps({ + "message": "Event validation failed", + "error": error_msg, + "event": event + })) + raise ValueError(error_msg) + + except Exception as e: + context.logger.error(json.dumps({ + "message": "Workflow execution failed", + "error": str(e), + "error_type": type(e).__name__, + "event": event + })) + raise diff --git a/lambda-durable-hitl-python-sam/src/workflow/requirements.txt b/lambda-durable-hitl-python-sam/src/workflow/requirements.txt new file mode 100644 index 000000000..99f850c1b --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/workflow/requirements.txt @@ -0,0 +1,9 @@ +# AWS SDK +boto3>=1.34.0 +botocore>=1.34.0 + +# AWS Durable Execution SDK +aws-durable-execution-sdk-python + +# Data validation and parsing +pydantic>=2.5.0 diff --git a/lambda-durable-hitl-python-sam/template.yaml b/lambda-durable-hitl-python-sam/template.yaml new file mode 100644 index 000000000..66b4850bd --- /dev/null +++ b/lambda-durable-hitl-python-sam/template.yaml @@ -0,0 +1,237 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + Lambda Durable Functions with Human-in-the-Loop (HITL) Pattern + + This pattern demonstrates how to pause Lambda execution, wait for human approval, + and resume using the Lambda durable functions SDK with Python 3.13. + +Globals: + Function: + Timeout: 900 + MemorySize: 512 + Architectures: + - x86_64 + Environment: + Variables: + LOG_LEVEL: INFO + +Parameters: + ApprovalTimeoutSeconds: + Type: Number + Default: 300 + Description: Default timeout for approval requests in seconds (5 minutes) + MinValue: 60 + MaxValue: 86400 + +Resources: + # DynamoDB Table for storing approval requests + ApprovalRequestsTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub '${AWS::StackName}-ApprovalRequests' + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: approval_id + AttributeType: S + - AttributeName: status + AttributeType: S + - AttributeName: created_at + AttributeType: S + KeySchema: + - AttributeName: approval_id + KeyType: HASH + GlobalSecondaryIndexes: + - IndexName: StatusIndex + KeySchema: + - AttributeName: status + KeyType: HASH + - AttributeName: created_at + KeyType: RANGE + Projection: + ProjectionType: ALL + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + SSESpecification: + SSEEnabled: true + PointInTimeRecoverySpecification: + PointInTimeRecoveryEnabled: true + Tags: + - Key: Pattern + Value: lambda-durable-hitl + + # SNS Topic for approval notifications + ApprovalNotificationsTopic: + Type: AWS::SNS::Topic + Properties: + TopicName: !Sub '${AWS::StackName}-ApprovalNotifications' + DisplayName: Approval Notifications + Tags: + - Key: Pattern + Value: lambda-durable-hitl + + # IAM Role for Workflow Lambda + WorkflowLambdaRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: WorkflowLambdaPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - dynamodb:PutItem + - dynamodb:GetItem + - dynamodb:UpdateItem + Resource: !GetAtt ApprovalRequestsTable.Arn + - Effect: Allow + Action: + - sns:Publish + Resource: !Ref ApprovalNotificationsTopic + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-*' + - Effect: Allow + Action: + - lambda:CheckpointDurableExecution + - lambda:GetDurableExecutionState + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-Workflow:$LATEST/durable-execution/*' + Tags: + - Key: Pattern + Value: lambda-durable-hitl + + # Workflow Lambda Function (Durable Execution) + WorkflowFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub '${AWS::StackName}-Workflow' + PackageType: Image + ImageUri: workflow:latest + Role: !GetAtt WorkflowLambdaRole.Arn + Timeout: 900 + MemorySize: 512 + DurableConfig: + ExecutionTimeout: 3600 + Environment: + Variables: + APPROVALS_TABLE_NAME: !Ref ApprovalRequestsTable + SNS_TOPIC_ARN: !Ref ApprovalNotificationsTopic + APPROVAL_TIMEOUT_SECONDS: !Ref ApprovalTimeoutSeconds + Tags: + Pattern: lambda-durable-hitl + Metadata: + DockerTag: latest + DockerContext: ./src + Dockerfile: workflow/Dockerfile + + # IAM Role for Approval API Lambda + ApprovalApiLambdaRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: ApprovalApiLambdaPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - dynamodb:GetItem + - dynamodb:UpdateItem + - dynamodb:Query + Resource: + - !GetAtt ApprovalRequestsTable.Arn + - !Sub '${ApprovalRequestsTable.Arn}/index/*' + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-*' + - Effect: Allow + Action: + - lambda:SendDurableExecutionCallbackSuccess + - lambda:SendDurableExecutionCallbackFailure + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-Workflow:$LATEST/durable-execution/*' + Tags: + - Key: Pattern + Value: lambda-durable-hitl + + # Approval API Lambda Function + ApprovalApiFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub '${AWS::StackName}-ApprovalApi' + PackageType: Image + ImageUri: approval_api:latest + Role: !GetAtt ApprovalApiLambdaRole.Arn + Environment: + Variables: + APPROVALS_TABLE_NAME: !Ref ApprovalRequestsTable + Tags: + Pattern: lambda-durable-hitl + Metadata: + DockerTag: latest + DockerContext: ./src + Dockerfile: approval_api/Dockerfile + +Outputs: + ApprovalRequestsTableName: + Description: DynamoDB table name for approval requests + Value: !Ref ApprovalRequestsTable + Export: + Name: !Sub '${AWS::StackName}-ApprovalRequestsTableName' + + ApprovalNotificationsTopicArn: + Description: SNS topic ARN for approval notifications + Value: !Ref ApprovalNotificationsTopic + Export: + Name: !Sub '${AWS::StackName}-ApprovalNotificationsTopicArn' + + WorkflowFunctionArn: + Description: Workflow Lambda function ARN + Value: !GetAtt WorkflowFunction.Arn + Export: + Name: !Sub '${AWS::StackName}-WorkflowFunctionArn' + + WorkflowFunctionName: + Description: Workflow Lambda function name + Value: !Ref WorkflowFunction + Export: + Name: !Sub '${AWS::StackName}-WorkflowFunctionName' + + ApprovalApiFunctionArn: + Description: Approval API Lambda function ARN + Value: !GetAtt ApprovalApiFunction.Arn + Export: + Name: !Sub '${AWS::StackName}-ApprovalApiFunctionArn' + + ApprovalApiFunctionName: + Description: Approval API Lambda function name + Value: !Ref ApprovalApiFunction + Export: + Name: !Sub '${AWS::StackName}-ApprovalApiFunctionName' + + StackName: + Description: CloudFormation stack name + Value: !Ref AWS::StackName + Export: + Name: !Sub '${AWS::StackName}-StackName'