Skip to main content

Building an Automated RAG System with AWS Bedrock, Lambda, and Terraform

·4459 words·21 mins

In the rapidly evolving world of AI applications, Retrieval Augmented Generation (RAG) has emerged as a powerful pattern for enhancing Large Language Models (LLMs) with up-to-date, domain-specific knowledge. This article walks through building a production-ready RAG ingestion pipeline using AWS services that automatically fetches news from RSS feeds and syncs them with an AWS Bedrock Knowledge Base.

The RAG Architecture Challenge
#

When implementing RAG systems, the ingestion pipeline is often overlooked but critically important. A well-designed pipeline should:

  1. Automatically retrieve fresh content from reliable sources
  2. Process and structure the content appropriately
  3. Efficiently sync data with your vector database or knowledge base
  4. Handle errors gracefully and provide operational visibility
  5. Scale with your needs while remaining cost-effective

Let’s examine a real-world implementation that addresses these challenges.

System Architecture Overview
#

The system consists of two Lambda functions orchestrated by EventBridge:

RAG System Architecture

  1. Fetcher Lambda: Triggered on a schedule, retrieves articles from Yahoo News RSS feeds, processes them, and uploads to S3
  2. Syncer Lambda: Triggered by the Fetcher Lambda, starts an ingestion job in AWS Bedrock Knowledge Base

This design follows the single-responsibility principle, making each component focused and maintainable.

Infrastructure as Code with Terraform
#

The entire infrastructure is defined using Terraform, enabling consistent deployments and making the solution reproducible. Let’s break down the key components:

Core Resources
#

# S3 Bucket for article storage
resource "aws_s3_bucket" "demo" {
  bucket = local.bucket_name
  tags   = local.common_tags
}

# Enable versioning and encryption
resource "aws_s3_bucket_versioning" "demo" {
  bucket = aws_s3_bucket.demo.id
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "demo" {
  bucket = aws_s3_bucket.demo.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

The S3 bucket serves as the central storage for our news articles, with proper versioning and encryption configured to meet security best practices.

Scheduled Execution
#

# Create EventBridge rule
resource "aws_cloudwatch_event_rule" "schedule" {
  name                = var.eventbridge_rule_name
  description         = "Trigger fetcher Lambda at a regular schedule"
  schedule_expression = var.schedule_expression
  tags                = local.common_tags
}

# Target the fetcher Lambda from the EventBridge rule
resource "aws_cloudwatch_event_target" "fetcher_lambda" {
  rule      = aws_cloudwatch_event_rule.schedule.name
  target_id = "TriggerFetcherLambda"
  arn       = aws_lambda_function.fetcher.arn
}

The system uses EventBridge (formerly CloudWatch Events) to trigger the fetcher Lambda on a configurable schedule, defaulting to every 10 minutes. This regular execution ensures your knowledge base stays up-to-date with the latest news.

The Fetcher Lambda: Content Acquisition
#

The fetcher Lambda is responsible for retrieving articles from configurable news sources. Here’s the core functionality:

  1. Reads the last run timestamp to implement incremental processing
  2. Fetches and parses RSS feeds from Yahoo News
  3. Structures articles with metadata and content
  4. Uploads the processed articles to S3
  5. Invokes the syncer Lambda to trigger ingestion
def lambda_handler(event, context):
    # Step 1: Get the timestamp of the last successful run
    last_run = get_last_run_timestamp()
    
    # Step 2: Fetch and parse news feed data
    raw_articles = fetch_news_feeds(YAHOO_NEWS_FEEDS.split(","))
    
    # Step 3: Process and upload articles to S3
    structured_articles = process_articles(raw_articles, last_run)
    uploaded_keys = upload_articles_to_s3(structured_articles)
    
    # Step 4: Update the last run timestamp
    current_run = update_last_run_timestamp()
    
    # Step 5: Invoke the syncer Lambda
    if uploaded_keys:
        invoke_syncer(uploaded_keys)
    
    return {
        "statusCode": 200,
        "body": json.dumps(
            {
                "message": f"Successfully processed {len(structured_articles)} news articles",
                "article_count": len(structured_articles),
                "uploaded_keys": uploaded_keys,
                "time_range": {"from": last_run, "to": current_run},
            }
        ),
    }

The code implements an important optimization: incremental processing. By storing the last run timestamp, it only processes new articles since the previous execution, minimizing unnecessary processing and reducing costs.

The Syncer Lambda: Knowledge Base Integration
#

Once new articles are stored in S3, the syncer Lambda triggers an ingestion job in AWS Bedrock Knowledge Base:

def lambda_handler(event, context):
    # Extract information from the event
    uploaded_keys = event.get("uploaded_keys", [])
    
    # Verify uploaded files exist in S3
    valid_files = check_uploaded_files(uploaded_keys)
    
    # Start the ingestion job
    response = start_ingestion_job()
    
    if response and "ingestionJob" in response:
        job = response["ingestionJob"]
        
        # Create a detailed response
        return {
            "statusCode": 202,  # Accepted
            "body": json.dumps({
                "message": "Ingestion job started successfully",
                "job_id": job["ingestionJobId"],
                "status": job["status"],
                # Additional details...
            }),
        }

The syncer includes robust error handling and verification steps:

  1. It confirms that uploaded files actually exist in S3
  2. It handles various error conditions that might occur when starting an ingestion job
  3. It logs detailed information about the job for monitoring and debugging

Configuration Management and Flexibility
#

The solution uses a hierarchical configuration approach:

  1. Default values defined in Terraform variables
  2. Environment-specific overrides through variable files
  3. Runtime configuration passed as environment variables to Lambda functions

This approach makes the solution adaptable to different environments and use cases without code changes.

# Lambda environment variables
environment {
  variables = {
    BUCKET_NAME           = aws_s3_bucket.demo.id
    S3_PREFIX_INCOMING    = local.s3_prefix_incoming
    YAHOO_NEWS_FEEDS      = var.yahoo_news_feeds
    LAMBDA_AWS_REGION     = var.aws_region
    CURSOR_KEY            = local.cursor_key
    BATCH_SIZE            = var.demo_batch_size
    SYNCER_FUNCTION_NAME  = aws_lambda_function.syncer.function_name
    BEDROCK_KB_NAME       = var.bedrock_kb_name
    BEDROCK_KB_ID         = var.bedrock_kb_id
    BEDROCK_DATA_SOURCE_ID = var.bedrock_data_source_id
  }
}

Operational Considerations
#

When deploying a system like this to production, there are several important considerations:

1. Content Source Limitations
#

RSS feeds typically provide summaries rather than full articles. For a more comprehensive knowledge base, you might need to:

  • Use web scraping (with proper permissions)
  • Consider paid news API services
  • Supplement with manual content curation

2. Rate Limiting and Quotas
#

The system implements basic rate limiting between feed fetches, but you should be aware of:

  • AWS Bedrock Knowledge Base ingestion quotas
  • RSS feed provider rate limits
  • AWS Lambda concurrency limits

3. Error Handling and Monitoring
#

The implementation includes:

  • Robust error handling with detailed logging
  • Job tracking for Bedrock ingestion jobs
  • Structured Lambda responses for easier troubleshooting

For production use, adding CloudWatch alarms and dashboards would provide better operational visibility.

Here are the proofread and restructured paragraphs for your closing sections:

Real-World Applications
#

This pipeline adapts to numerous business scenarios:

  • News Services: Keep your AI updated with the latest articles
  • Internal Knowledge Bases: Automatically ingest company documentation
  • Product Catalogs: Ensure your AI knows about new offerings
  • Customer Support: Feed in new troubleshooting procedures or FAQs

Extending the Solution
#

This implementation can be extended in several ways:

  1. Multiple Data Sources: Add support for different content sources beyond RSS feeds
  2. Content Preprocessing: Implement text summarization or entity extraction before ingestion
  3. Multi-Environment Support: Add deployment pipelines for dev/staging/prod environments
  4. Content Filtering: Add filtering logic to focus on specific topics or categories

Taking It Further
#

While our demo uses Yahoo News feeds, you could extend this system to:

  1. Connect to your actual data sources
  2. Add preprocessing steps for document chunking and cleaning
  3. Implement validation before ingestion
  4. Add monitoring and alerting for failed ingestion jobs

Conclusion
#

Building a reliable, automated RAG ingestion pipeline is essential for maintaining up-to-date AI applications. The solution presented here provides a solid foundation that you can adapt to your specific needs.

The architecture balances simplicity with robustness, using AWS managed services to minimize operational overhead while providing the flexibility needed for various use cases.

This automated RAG ingestion system demonstrates the power of connecting AWS services to keep AI systems current with minimal overhead. The serverless architecture ensures cost-effectiveness, while the scheduled nature guarantees freshness without manual intervention.

By implementing this pipeline, you’ll ensure your RAG system always has access to fresh, relevant data, significantly improving the quality of AI-generated responses as your data grows and evolves.


#  (terraform/locals.tf)
locals {
  # Generate a random suffix if bucket_name is not provided
  bucket_suffix = var.bucket_name == "" ? random_id.bucket_suffix[0].hex : ""
  bucket_name   = var.bucket_name == "" ? "${var.project}-rag-demo-${local.bucket_suffix}" : var.bucket_name
  
  # Common tags to apply to all resources
  common_tags = merge({
    Project     = var.project
    Environment = "demo"
    ManagedBy   = "Terraform"
  }, var.tags)
  
  # Lambda common settings
  lambda_runtime     = "python3.11"
  lambda_handler     = "main.lambda_handler"
  lambda_timeout     = 300
  lambda_memory_size = 128
  
  # S3 settings
  s3_prefix_incoming = var.s3_prefix_incoming
  cursor_key         = "state/cursor.json"
  
  # Lambda path - adjust based on your directory structure
  lambda_src_path = "${path.module}/../codebase/lambda"
}
# terraform/outputs.tf
output "bucket_name" {
  description = "Name of the S3 bucket created"
  value       = aws_s3_bucket.demo.id
}

output "fetcher_name" {
  description = "Name of the fetcher Lambda function"
  value       = aws_lambda_function.fetcher.function_name
}

output "syncer_name" {
  description = "Name of the syncer Lambda function"
  value       = aws_lambda_function.syncer.function_name
}

output "region" {
  description = "AWS region used for deployment"
  value       = var.aws_region
}

output "schedule_rule" {
  description = "Name of the EventBridge rule"
  value       = aws_cloudwatch_event_rule.schedule.name
}
# terraform/variables.tf
variable "project" {
  description = "Project name"
  type        = string
  default     = "kb-demo"
}

variable "aws_region" {
  description = "AWS region for deployment"
  type        = string
  default     = "us-east-1"
}

variable "aws_profile" {
  description = "AWS CLI profile to use"
  type        = string
  default     = "default"
}

variable "eventbridge_rule_name" {
  description = "Name for the EventBridge rule"
  type        = string
  default     = "rag_hourly_ingest_eb"
}

variable "fetcher_lambda_name" {
  description = "Name for the fetcher Lambda function"
  type        = string
  default     = "fetcher_lambda"
}

variable "yahoo_news_feeds" {
  description = "Comma-separated list of Yahoo News RSS feed URLs"
  type        = string
  default     = "https://www.yahoo.com/news/rss,https://www.yahoo.com/news/world/rss"
}

variable "syncer_lambda_name" {
  description = "Name for the syncer Lambda function"
  type        = string
  default     = "syncer_lambda"
}

variable "bedrock_kb_name" {
  description = "Name of the Bedrock Knowledge Base (informational only)"
  type        = string
  default     = "knowledge-base-quick-start-5d3h3"
}

variable "bedrock_kb_id" {
  description = "ID of the Bedrock Knowledge Base"
  type        = string
  sensitive   = true
}

variable "bedrock_data_source_id" {
  description = "ID of the Bedrock Knowledge Base Data Source (optional)"
  type        = string
  sensitive   = true
  default     = ""
}

variable "bucket_name" {
  description = "S3 bucket name (optional, will be auto-generated if not provided)"
  type        = string
  sensitive   = true
  default     = ""
}

variable "s3_prefix_incoming" {
  description = "S3 prefix for incoming documents"
  type        = string
  default     = "incoming/"
}

variable "schedule_expression" {
  description = "EventBridge schedule expression"
  type        = string
  default     = "rate(10 minutes)"
}

variable "demo_batch_size" {
  description = "Number of chunks to upload per fetcher invocation"
  type        = number
  default     = 1
}

variable "tags" {
  description = "Tags to apply to all resources"
  type        = map(string)
  default     = {}
}
# terraform/versions.tf
terraform {
  required_version = ">= 1.6"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = ">= 5.50.0"
    }
    archive = {
      source  = "hashicorp/archive"
      version = ">= 2.3.0"
    }
    random = {
      source  = "hashicorp/random"
      version = ">= 3.5.0"
    }
  }
}
provider "aws" {
  region  = var.aws_region
  profile = var.aws_profile
}

# Random suffix for the S3 bucket name
resource "random_id" "bucket_suffix" {
  count       = var.bucket_name == "" ? 1 : 0
  byte_length = 4
}

# S3 Bucket
resource "aws_s3_bucket" "demo" {
  bucket = local.bucket_name
  tags   = local.common_tags
}

# Enable versioning for the S3 bucket
resource "aws_s3_bucket_versioning" "demo" {
  bucket = aws_s3_bucket.demo.id
  versioning_configuration {
    status = "Enabled"
  }
}

# Server-side encryption for the S3 bucket
resource "aws_s3_bucket_server_side_encryption_configuration" "demo" {
  bucket = aws_s3_bucket.demo.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

# Create EventBridge rule
resource "aws_cloudwatch_event_rule" "schedule" {
  name                = var.eventbridge_rule_name
  description         = "Trigger fetcher Lambda at a regular schedule"
  schedule_expression = var.schedule_expression
  tags                = local.common_tags
}

# Target the fetcher Lambda from the EventBridge rule
resource "aws_cloudwatch_event_target" "fetcher_lambda" {
  rule      = aws_cloudwatch_event_rule.schedule.name
  target_id = "TriggerFetcherLambda"
  arn       = aws_lambda_function.fetcher.arn
}

# Permission for EventBridge to invoke the fetcher Lambda
resource "aws_lambda_permission" "allow_eventbridge" {
  statement_id  = "AllowExecutionFromEventBridge"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.fetcher.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.schedule.arn
}

# Create fetcher Lambda function
data "archive_file" "fetcher_zip" {
  type        = "zip"
  source_dir  = "${local.lambda_src_path}/fetcher"
  output_path = "${path.module}/fetcher.zip"
}

resource "aws_lambda_function" "fetcher" {
  function_name    = var.fetcher_lambda_name
  role             = aws_iam_role.fetcher_lambda.arn
  handler          = local.lambda_handler
  runtime          = local.lambda_runtime
  timeout          = local.lambda_timeout
  memory_size      = local.lambda_memory_size
  filename         = data.archive_file.fetcher_zip.output_path
  source_code_hash = data.archive_file.fetcher_zip.output_base64sha256

  environment {
    variables = {
      BUCKET_NAME           = aws_s3_bucket.demo.id
      S3_PREFIX_INCOMING    = local.s3_prefix_incoming
      CURSOR_KEY            = local.cursor_key
      BATCH_SIZE            = var.demo_batch_size
      SYNCER_FUNCTION       = var.syncer_lambda_name
      LAMBDA_AWS_REGION     = var.aws_region
      YAHOO_NEWS_FEEDS      = var.yahoo_news_feeds
      BEDROCK_KB_ID         = var.bedrock_kb_id
      BEDROCK_DATA_SOURCE_ID = var.bedrock_data_source_id
    }
  }

  tags = local.common_tags
}

# Create syncer Lambda function
data "archive_file" "syncer_zip" {
  type        = "zip"
  source_dir  = "${local.lambda_src_path}/syncer"
  output_path = "${path.module}/syncer.zip"
}

resource "aws_lambda_function" "syncer" {
  function_name    = var.syncer_lambda_name
  role             = aws_iam_role.syncer_lambda.arn
  handler          = local.lambda_handler
  runtime          = local.lambda_runtime
  timeout          = local.lambda_timeout
  memory_size      = local.lambda_memory_size
  filename         = data.archive_file.syncer_zip.output_path
  source_code_hash = data.archive_file.syncer_zip.output_base64sha256

  environment {
    variables = {
      BUCKET_NAME           = aws_s3_bucket.demo.id
      S3_PREFIX_INCOMING    = local.s3_prefix_incoming
      LAMBDA_AWS_REGION     = var.aws_region
      BEDROCK_KB_NAME       = var.bedrock_kb_name
      BEDROCK_KB_ID         = var.bedrock_kb_id
      BEDROCK_DATA_SOURCE_ID = var.bedrock_data_source_id
    }
  }

  tags = local.common_tags
}

# IAM Role for fetcher Lambda
resource "aws_iam_role" "fetcher_lambda" {
  name = "${var.fetcher_lambda_name}_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })

  tags = local.common_tags
}

# IAM Role for syncer Lambda
resource "aws_iam_role" "syncer_lambda" {
  name = "${var.syncer_lambda_name}_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })

  tags = local.common_tags
}

# CloudWatch Logs policy for fetcher Lambda
resource "aws_iam_policy" "fetcher_logs" {
  name        = "${var.fetcher_lambda_name}_logs_policy"
  description = "IAM policy for fetcher Lambda logging"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      }
    ]
  })
}

# CloudWatch Logs policy for syncer Lambda
resource "aws_iam_policy" "syncer_logs" {
  name        = "${var.syncer_lambda_name}_logs_policy"
  description = "IAM policy for syncer Lambda logging"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      }
    ]
  })
}

# S3 access policy for fetcher Lambda
resource "aws_iam_policy" "fetcher_s3" {
  name        = "${var.fetcher_lambda_name}_s3_policy"
  description = "IAM policy for fetcher Lambda S3 access"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:ListBucket"
        ]
        Resource = aws_s3_bucket.demo.arn
      },
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:PutObjectTagging"
        ]
        Resource = "${aws_s3_bucket.demo.arn}/*"
      }
    ]
  })
}

# S3 access policy for syncer Lambda
resource "aws_iam_policy" "syncer_s3" {
  name        = "${var.syncer_lambda_name}_s3_policy"
  description = "IAM policy for syncer Lambda S3 access"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:ListBucket"
        ]
        Resource = aws_s3_bucket.demo.arn
      },
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject"
        ]
        Resource = "${aws_s3_bucket.demo.arn}/*"
      }
    ]
  })
}

# Lambda invoke policy for fetcher Lambda
resource "aws_iam_policy" "fetcher_invoke" {
  name        = "${var.fetcher_lambda_name}_invoke_policy"
  description = "IAM policy for fetcher Lambda to invoke syncer Lambda"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "lambda:InvokeFunction"
        ]
        Resource = aws_lambda_function.syncer.arn
      }
    ]
  })
}

# Bedrock KB access policy for syncer Lambda (only if KB ID is provided)
resource "aws_iam_policy" "syncer_bedrock" {
  count       = var.bedrock_kb_id != "" ? 1 : 0
  name        = "${var.syncer_lambda_name}_bedrock_policy"
  description = "IAM policy for syncer Lambda Bedrock KB access"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "bedrock:StartIngestionJob",
          "bedrock:GetIngestionJob",
          "bedrock:ListIngestionJobs"
        ]
        Resource = "arn:aws:bedrock:${var.aws_region}:${data.aws_caller_identity.current.account_id}:knowledge-base/${var.bedrock_kb_id}"
      }
    ]
  })
}

# Get AWS account ID
data "aws_caller_identity" "current" {}

# Attach policies to roles
resource "aws_iam_role_policy_attachment" "fetcher_logs" {
  role       = aws_iam_role.fetcher_lambda.name
  policy_arn = aws_iam_policy.fetcher_logs.arn
}

resource "aws_iam_role_policy_attachment" "fetcher_s3" {
  role       = aws_iam_role.fetcher_lambda.name
  policy_arn = aws_iam_policy.fetcher_s3.arn
}

resource "aws_iam_role_policy_attachment" "fetcher_invoke" {
  role       = aws_iam_role.fetcher_lambda.name
  policy_arn = aws_iam_policy.fetcher_invoke.arn
}

resource "aws_iam_role_policy_attachment" "syncer_logs" {
  role       = aws_iam_role.syncer_lambda.name
  policy_arn = aws_iam_policy.syncer_logs.arn
}

resource "aws_iam_role_policy_attachment" "syncer_s3" {
  role       = aws_iam_role.syncer_lambda.name
  policy_arn = aws_iam_policy.syncer_s3.arn
}

resource "aws_iam_role_policy_attachment" "syncer_bedrock" {
  count      = var.bedrock_kb_id != "" ? 1 : 0
  role       = aws_iam_role.syncer_lambda.name
  policy_arn = aws_iam_policy.syncer_bedrock[0].arn
}
#!/bin/bash
# cleanup.sh
#!/bin/bash
# Cleanup script for Terraform resources

# Set the AWS profile
export AWS_PROFILE=munnoo
export AWS_REGION=us-east-1

export TF_VAR_project="rag_hourly_ingest_claude"
export TF_VAR_aws_region="$AWS_REGION"
export TF_VAR_aws_profile="$AWS_PROFILE"

export TF_VAR_eventbridge_rule_name="rag_hourly_ingest_eb"
export TF_VAR_fetcher_lambda_name="fetcher_lambda"
export TF_VAR_syncer_lambda_name="syncer_lambda"

export TF_VAR_bedrock_kb_name="knowledge-base-quick-start-5d3h3"
export TF_VAR_bedrock_kb_id="8FYOHD0OOA"          # sensitive
export TF_VAR_bedrock_data_source_id="JHA7ZKEA48"           # optional, sensitive

export TF_VAR_bucket_name="khalidrizvi-rag-houry-ingest-claude"                      # optional fixed name (sensitive), else empty for auto-generate
export TF_VAR_s3_prefix_incoming="incoming/"
export TF_VAR_schedule_expression="rate(10 minutes)"
export TF_VAR_demo_batch_size=1

cd terraform
# Get outputs
REGION=$(terraform output -raw region)
BUCKET=$(terraform output -raw bucket_name)

echo "Emptying S3 bucket s3://${BUCKET} ..."
aws s3 rm "s3://${BUCKET}" --recursive --region "$REGION" --profile munnoo

echo "Destroying Terraform resources..."
terraform destroy -auto-approve

echo "Cleanup complete!"
#!/bin/bash

#deploy.sh
#!/bin/bash
# Environment setup for Terraform deployment

export AWS_PROFILE=profile
export AWS_REGION=region

export TF_VAR_project="rag_hourly_ingest_claude"
export TF_VAR_aws_region="$AWS_REGION"
export TF_VAR_aws_profile="$AWS_PROFILE"

export TF_VAR_eventbridge_rule_name="rag_hourly_ingest_eb"
export TF_VAR_fetcher_lambda_name="fetcher_lambda"
export TF_VAR_syncer_lambda_name="syncer_lambda"

export TF_VAR_bedrock_kb_name="dummy_kb"
export TF_VAR_bedrock_kb_id="kb_id"          # sensitive
export TF_VAR_bedrock_data_source_id="ds_id"           # optional, sensitive

export TF_VAR_bucket_name="s3_bucket_name"                      # optional fixed name (sensitive), else empty for auto-generate
export TF_VAR_s3_prefix_incoming="incoming/"
export TF_VAR_schedule_expression="rate(10 minutes)"
export TF_VAR_demo_batch_size=1

# Install dependencies for Lambda functions
echo "Installing Lambda dependencies..."
pip install -r requirements.txt --target ./codebase/lambda/fetcher/ --ignore-installed

# If syncer also needs dependencies, install them too
pip install -r requirements.txt --target ./codebase/lambda/fetcher/ --ignore-installed

# Deploy instructions
cd terraform
terraform init
terraform plan -out plan.tfplan
terraform apply plan.tfplan

# Smoke test without waiting 10 minutes:
REGION=$(terraform output -raw region)
FETCHER=$(terraform output -raw fetcher_name)
aws lambda invoke --function-name "$FETCHER" --payload '{}' out.json --region "$REGION" >/dev/null
cat out.json

BUCKET=$(terraform output -raw bucket_name)
aws s3 ls "s3://${BUCKET}/incoming/" --recursive --region "$REGION"

# View CloudWatch logs for the Lambda functions
echo "View fetcher Lambda logs:"
echo "aws logs filter-log-events --log-group-name /aws/lambda/$FETCHER --region $REGION"

echo "View syncer Lambda logs:"
echo "aws logs filter-log-events --log-group-name /aws/lambda/$(terraform output -raw syncer_name) --region $REGION"
# codebase/fetcher.py
import os
import boto3
import json
import logging
import requests
import feedparser
from datetime import datetime, timedelta
from email.utils import parsedate_to_datetime
from hashlib import md5
import time

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Environment variables
BUCKET_NAME = os.environ["BUCKET_NAME"]
S3_PREFIX_INCOMING = os.environ["S3_PREFIX_INCOMING"]
LAST_RUN_KEY = os.environ["CURSOR_KEY"]  # Repurposing cursor for timestamp
SYNCER_FUNCTION = os.environ["SYNCER_FUNCTION"]
YAHOO_NEWS_FEEDS = os.environ.get(
    "YAHOO_NEWS_FEEDS",
    "https://www.yahoo.com/news/rss,https://www.yahoo.com/news/world/rss",
)

# Initialize AWS clients
s3_client = boto3.client("s3")
lambda_client = boto3.client("lambda")


def get_last_run_timestamp():
    """Read the last run timestamp from S3."""
    try:
        response = s3_client.get_object(Bucket=BUCKET_NAME, Key=LAST_RUN_KEY)
        content = json.loads(response["Body"].read())
        return content.get("last_run", None)
    except s3_client.exceptions.NoSuchKey:
        # If this is our first run, use a timestamp from 1 hour ago
        default_time = (datetime.utcnow() - timedelta(hours=1)).isoformat()
        logger.info(f"No previous run record found, defaulting to {default_time}")
        return default_time
    except Exception as e:
        logger.error(f"Error reading last run timestamp: {str(e)}")
        # Fallback to 1 hour ago if there's any error
        return (datetime.utcnow() - timedelta(hours=1)).isoformat()


def update_last_run_timestamp():
    """Write the current timestamp to S3 as our new 'last run' marker."""
    current_time = datetime.utcnow().isoformat()
    content = {"last_run": current_time, "updated_at": current_time}
    s3_client.put_object(
        Bucket=BUCKET_NAME,
        Key=LAST_RUN_KEY,
        Body=json.dumps(content),
        ContentType="application/json",
    )
    logger.info(f"Updated last run timestamp to {current_time}")
    return current_time


def fetch_yahoo_news(last_run_timestamp):
    """Fetch news from Yahoo RSS feeds published since the last run."""
    feeds = YAHOO_NEWS_FEEDS.split(",")
    last_run = datetime.fromisoformat(last_run_timestamp)
    all_articles = []

    logger.info(f"Fetching news published after {last_run_timestamp}")

    for feed_url in feeds:
        try:
            logger.info(f"Processing feed: {feed_url}")
            # Add a small delay to avoid rate limiting
            time.sleep(1)

            # Parse the RSS feed
            feed = feedparser.parse(feed_url)

            if hasattr(feed, "status") and feed.status != 200:
                logger.warning(
                    f"Feed returned non-200 status: {feed.status}, URL: {feed_url}"
                )
                continue

            # Filter for entries newer than our last run
            new_entries = []
            for entry in feed.entries:
                # Yahoo RSS uses 'published' for the publication date
                if "published" in entry:
                    try:
                        # Parse RFC822 date format used in RSS
                        pub_date = parsedate_to_datetime(entry.published)

                        # Only keep entries newer than our last run
                        if pub_date > last_run:
                            new_entries.append(entry)
                    except Exception as e:
                        logger.warning(
                            f"Could not parse publication date: {entry.published}, error: {str(e)}"
                        )
                        # If we can't parse the date, include it to be safe
                        new_entries.append(entry)
                else:
                    # If there's no publication date, include it to be safe
                    new_entries.append(entry)

            logger.info(f"Found {len(new_entries)} new articles in {feed_url}")
            all_articles.extend(new_entries)

        except Exception as e:
            logger.error(f"Error processing feed {feed_url}: {str(e)}")

    return all_articles


def structure_article(entry, source_url):
    """Convert a feedparser entry into a structured article for our knowledge base."""
    # Create a unique ID for the article
    if hasattr(entry, "id"):
        article_id = entry.id
    else:
        # If no ID exists, create one from the title and link
        article_id = md5((entry.title + entry.link).encode()).hexdigest()

    # Extract the publication date
    if hasattr(entry, "published"):
        try:
            pub_date = parsedate_to_datetime(entry.published).isoformat()
        except:
            pub_date = datetime.utcnow().isoformat()
    else:
        pub_date = datetime.utcnow().isoformat()

    # Extract the content
    content = ""
    if hasattr(entry, "content"):
        content = entry.content[0].value
    elif hasattr(entry, "summary"):
        content = entry.summary
    elif hasattr(entry, "description"):
        content = entry.description

    # Create a structured article document
    article = {
        "id": article_id,
        "title": entry.title,
        "link": entry.link,
        "published_date": pub_date,
        "content": content,
        "source": {"name": "Yahoo News", "url": source_url},
        "fetched_at": datetime.utcnow().isoformat(),
    }

    # Add categories if available
    if hasattr(entry, "tags"):
        article["categories"] = [tag.term for tag in entry.tags]

    # Add author if available
    if hasattr(entry, "author"):
        article["author"] = entry.author

    return article


def upload_articles_to_s3(articles):
    """Upload structured news articles to S3."""
    uploaded_keys = []

    for article in articles:
        # Create a filename based on publication date and ID for easy sorting
        pub_date = datetime.fromisoformat(
            article["published_date"].replace("Z", "+00:00")
            if article["published_date"].endswith("Z")
            else article["published_date"]
        )
        filename = f"{pub_date.strftime('%Y-%m-%d')}/{article['id']}.json"

        s3_key = f"{S3_PREFIX_INCOMING}{filename}"

        # Upload to S3
        s3_client.put_object(
            Bucket=BUCKET_NAME,
            Key=s3_key,
            Body=json.dumps(article),
            ContentType="application/json",
        )

        uploaded_keys.append(s3_key)
        logger.info(
            f"Uploaded article: {article['title']} to s3://{BUCKET_NAME}/{s3_key}"
        )

    return uploaded_keys


def invoke_syncer(uploaded_keys):
    """Invoke the syncer Lambda with the list of uploaded keys."""
    if not uploaded_keys:
        logger.info("No articles uploaded, skipping syncer invocation")
        return

    payload = {
        "uploaded_keys": uploaded_keys,
        "source": "yahoo_news",
        "timestamp": datetime.utcnow().isoformat(),
    }

    try:
        response = lambda_client.invoke(
            FunctionName=SYNCER_FUNCTION,
            InvocationType="Event",  # Asynchronous invocation
            Payload=json.dumps(payload),
        )
        logger.info(f"Invoked syncer Lambda {SYNCER_FUNCTION} asynchronously")
        return response
    except Exception as e:
        logger.error(f"Error invoking syncer: {str(e)}")
        return None


def lambda_handler(event, context):
    """
    Main Lambda handler function:
    1. Get the timestamp of the last run
    2. Fetch Yahoo News articles published since then
    3. Structure and upload articles to S3
    4. Update the last run timestamp
    5. Trigger the syncer to update our knowledge base
    """
    logger.info(f"Yahoo News Fetcher Lambda triggered with event: {json.dumps(event)}")

    # Step 1: Get the last run timestamp
    last_run = get_last_run_timestamp()

    # Step 2: Fetch news from Yahoo
    articles_raw = fetch_yahoo_news(last_run)

    if not articles_raw:
        logger.info("No new articles found since the last run")
        # Still update the timestamp even if no new articles
        current_run = update_last_run_timestamp()
        return {
            "statusCode": 200,
            "body": json.dumps(
                {
                    "message": "No new articles found",
                    "time_range": {"from": last_run, "to": current_run},
                }
            ),
        }

    # Step 3: Structure articles and upload to S3
    structured_articles = [
        structure_article(entry, source_url)
        for entry in articles_raw
        for source_url in YAHOO_NEWS_FEEDS.split(",")
    ]

    uploaded_keys = upload_articles_to_s3(structured_articles)

    # Step 4: Update the last run timestamp
    current_run = update_last_run_timestamp()

    # Step 5: Invoke the syncer Lambda
    if uploaded_keys:
        invoke_syncer(uploaded_keys)

    return {
        "statusCode": 200,
        "body": json.dumps(
            {
                "message": f"Successfully processed {len(structured_articles)} news articles",
                "article_count": len(structured_articles),
                "uploaded_keys": uploaded_keys,
                "time_range": {"from": last_run, "to": current_run},
            }
        ),
    }
# codebase/syncer.py
import os
import boto3
import json
import logging
from botocore.exceptions import ClientError
from datetime import datetime

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Environment variables
BUCKET_NAME = os.environ["BUCKET_NAME"]
S3_PREFIX_INCOMING = os.environ["S3_PREFIX_INCOMING"]
AWS_REGION = os.environ.get("LAMBDA_AWS_REGION", "us-east-1")  # Changed from AWS_REGION
BEDROCK_KB_NAME = os.environ["BEDROCK_KB_NAME"]
BEDROCK_KB_ID = os.environ["BEDROCK_KB_ID"]
BEDROCK_DATA_SOURCE_ID = os.environ.get("BEDROCK_DATA_SOURCE_ID", "")

# Initialize AWS clients
s3_client = boto3.client("s3", region_name=AWS_REGION)
bedrock_agent = boto3.client("bedrock-agent", region_name=AWS_REGION)


def start_ingestion_job():
    """Start a Bedrock Knowledge Base ingestion job."""
    if not BEDROCK_KB_ID:
        logger.warning("BEDROCK_KB_ID is not set, skipping ingestion")
        return None

    try:
        # Prepare params for the API call
        params = {
            "knowledgeBaseId": BEDROCK_KB_ID,
        }

        # Only add dataSourceId if it's provided
        if BEDROCK_DATA_SOURCE_ID:
            params["dataSourceId"] = BEDROCK_DATA_SOURCE_ID
            logger.info(
                f"Starting ingestion job for KB ID: {BEDROCK_KB_ID}, Data Source ID: {BEDROCK_DATA_SOURCE_ID}"
            )
        else:
            logger.info(
                f"Starting ingestion job for KB ID: {BEDROCK_KB_ID} (no Data Source ID provided)"
            )

        # Log attempt to start ingestion
        logger.info(
            f"Attempting to start ingestion job with params: {json.dumps(params)}"
        )

        # Call the Bedrock API
        response = bedrock_agent.start_ingestion_job(params)

        # Log success
        job_id = response["ingestionJob"]["ingestionJobId"]
        status = response["ingestionJob"]["status"]
        logger.info(
            f"Ingestion job started successfully: {job_id}, initial status: {status}"
        )

        # Record additional details about the job
        try:
            job_details = {
                "job_id": job_id,
                "status": status,
                "started_at": datetime.utcnow().isoformat(),
                "kb_id": BEDROCK_KB_ID,
                "kb_name": BEDROCK_KB_NAME,
            }

            # Store job details in S3 for tracking purposes
            s3_client.put_object(
                Bucket=BUCKET_NAME,
                Key=f"jobs/{job_id}.json",
                Body=json.dumps(job_details),
                ContentType="application/json",
            )
        except Exception as e:
            # Non-critical error - job is still running even if we can't save details
            logger.warning(f"Could not save job details to S3: {str(e)}")

        return response

    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_message = e.response["Error"]["Message"]
        logger.error(f"Error starting ingestion job: {error_code} - {error_message}")
        logger.error(f"Full error response: {json.dumps(e.response)}")
        return {"error": error_code, "message": error_message}
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        return {"error": "UnexpectedException", "message": str(e)}


def check_uploaded_files(uploaded_keys):
    """Verify that the uploaded files exist in S3 and log information about them."""
    if not uploaded_keys:
        logger.info("No uploaded files to check")
        return 0

    valid_files = 0
    for s3_key in uploaded_keys:
        try:
            # Check if the file exists
            response = s3_client.head_object(Bucket=BUCKET_NAME, Key=s3_key)
            size = response.get("ContentLength", 0)
            last_modified = (
                response.get("LastModified", "").isoformat()
                if hasattr(response.get("LastModified", ""), "isoformat")
                else "unknown"
            )

            logger.info(
                f"Verified file exists: s3://{BUCKET_NAME}/{s3_key}, Size: {size} bytes, Last Modified: {last_modified}"
            )
            valid_files += 1
        except Exception as e:
            logger.warning(
                f"File not found or cannot be accessed: s3://{BUCKET_NAME}/{s3_key}, Error: {str(e)}"
            )

    logger.info(f"Verified {valid_files} out of {len(uploaded_keys)} files")
    return valid_files


def lambda_handler(event, context):
    """
    Main Lambda handler function.
    1. Processes the event from the fetcher Lambda
    2. Validates uploaded files exist in S3
    3. Starts a Bedrock KB ingestion job
    """
    logger.info(f"Syncer Lambda triggered with event: {json.dumps(event)}")

    # Extract information from the event
    uploaded_keys = event.get("uploaded_keys", [])
    source = event.get("source", "unknown")
    timestamp = event.get("timestamp", datetime.utcnow().isoformat())

    # Log information about the files being ingested
    logger.info(
        f"Preparing to process {len(uploaded_keys)} files from source: {source}"
    )

    # Verify uploaded files exist in S3
    valid_files = check_uploaded_files(uploaded_keys)
    if valid_files == 0 and len(uploaded_keys) > 0:
        logger.warning("None of the specified files could be found in S3")
        return {
            "statusCode": 404,
            "body": json.dumps(
                {
                    "message": "No valid files found to ingest",
                    "uploaded_keys": uploaded_keys,
                }
            ),
        }

    # Start the ingestion job
    response = start_ingestion_job()

    if response and "ingestionJob" in response:
        job = response["ingestionJob"]

        # Create a detailed response with information about what's being ingested
        return {
            "statusCode": 202,  # Accepted
            "body": json.dumps(
                {
                    "message": "Ingestion job started successfully",
                    "job_id": job["ingestionJobId"],
                    "status": job["status"],
                    "kb_name": BEDROCK_KB_NAME,
                    "kb_id": BEDROCK_KB_ID,
                    "data_source_id": BEDROCK_DATA_SOURCE_ID or "Not specified",
                    "files_processed": valid_files,
                    "source": source,
                    "timestamp": timestamp,
                }
            ),
        }
    elif response and "error" in response:
        return {
            "statusCode": 500,
            "body": json.dumps(
                {
                    "message": "Failed to start ingestion job",
                    "error": response["error"],
                    "details": response["message"],
                    "files_processed": valid_files,
                    "source": source,
                }
            ),
        }
    else:
        return {
            "statusCode": 204,  # No Content
            "body": json.dumps(
                {
                    "message": "No ingestion job was started (BEDROCK_KB_ID not configured)",
                    "files_processed": valid_files,
                    "source": source,
                }
            ),
        }

Important Considerations
#

  1. No API Key Required: Unlike many news services, Yahoo News RSS feeds don’t require authentication, making this solution simple to deploy.

  2. Rate Limiting: I’ve added a small delay between feed fetches to avoid potential rate limiting, but Yahoo doesn’t publish specific limits for their RSS feeds.

  3. Content Limitations: RSS feeds typically provide summaries rather than full articles. If you need complete article content, you might need to:

    • Use a web scraping approach (with proper permissions)
    • Consider a paid news API service
  4. Historical Data: RSS feeds only contain recent articles. For historical data going back to 1999, you would need:

    • A one-time data import from an archive service
    • A commercial news database subscription
  5. Error Handling: The code includes robust error handling to ensure the function continues running even if individual feeds fail.

  6. Time Zones: RSS dates are standardized in RFC822 format, which the function correctly parses to maintain accurate timestamps.

This implementation gives you a solid foundation for keeping your knowledge base updated with the latest news from Yahoo, without requiring any API keys or paid services. The fetcher is designed to be reliable, efficient, and to work seamlessly with your existing syncer function to maintain an up-to-date knowledge base for your RAG system.