In this blog, we'll explore an end-to-end data processing automation workflow using AWS Glue, S3, and Lambda. We'll cover uploading objects to S3, triggering Lambda functions via S3 event notifications, running AWS Glue jobs, and storing transformed data back into S3 and a database.
Prerequisites
Before we begin, ensure you have the following:
AWS account with necessary permissions
AWS CLI configured
Basic knowledge of AWS services (S3, Lambda, Glue, IAM)
Step 1: Setting Up Your S3 Bucket
Create an S3 Bucket:
Go to the S3 service in your AWS console.
Click "Create bucket."
Enter a unique bucket name and select the region.
Leave other settings as default and create the bucket.
Upload a Sample File:
Click on your newly created bucket.
Click "Upload" and add a sample CSV file.
Step 2: Setting Up S3 Event Notifications
Configure S3 Event Notifications:
Go to your S3 bucket and click on "Properties."
Scroll down to "Event notifications" and click "Create event notification."
Enter a name for the event.
Select "All object create events."
Choose "Lambda function" as the destination and select or create your Lambda function (we'll create this next).
Step 3: Creating Your Lambda Function
Create a Lambda Function:
Go to the Lambda service in your AWS console.
Click "Create function."
Select "Author from scratch."
Enter a function name and choose a runtime (e.g., Python 3.8).
Click "Create function."
Add Environment Variables:
Go to your Lambda function and click on "Configuration."
Click "Environment variables" and add the following:
GLUE_JOB_NAME
: The name of your Glue job.
Write the Lambda Function Code:
import json import boto3 def lambda_handler(event, context): glue = boto3.client('glue') for record in event['Records']: s3_bucket = record['s3']['bucket']['name'] s3_key = record['s3']['object']['key'] # Extract the filename from the S3 key file_name = s3_key.split('/')[-1] # Set default env value env = 'qa' output_file_path ='s3://automation/' # Check conditions based on keywords in the input file path if 'QA' in s3_key: env = 'qa' output_file_path = 's3://automation/' elif 'STAGING' in s3_key: env = 'staging' output_file_path = '' elif 'PROD' in s3_key: env = 'prod' output_file_path = 's3://automation/' elif 'DEV' in s3_key: env = 'dev' output_file_path = 's3://automation/' # Add additional conditions for other environments if needed # Define job parameters based on the conditions job_parameters = { '--filename': file_name, '--env': env, '--output_file_path': output_file_path, '--input_file_path': f's3://{s3_bucket}/{s3_key}' } glue_job_name = 'My-Testing-Glue-Job' # Trigger the AWS Glue job with job parameters response = glue.start_job_run( JobName=glue_job_name, Arguments=job_parameters ) print(f"Triggered Glue Job: {glue_job_name} with parameters: {job_parameters}") # You can add error handling and logging as needed.
Step 4: Setting Up AWS Glue
Create a Glue Database:
Go to the Glue service in your AWS console.
Click "Databases" and then "Add database."
Enter a database name and click "Create."
Create a Glue Job:
Go to "Jobs" and click "Add job."
Enter a job name.
Choose "IAM Role" with necessary permissions.
Select your source and target data stores (S3 buckets).
Write your ETL script (example below).
Step 5: Writing the Glue ETL Script
Sample ETL Script:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # Load data from S3 datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "your_database", table_name = "your_table", transformation_ctx = "datasource0") # Transform data transformed_df = datasource0.toDF().withColumnRenamed("old_column_name", "new_column_name") # Save transformed data back to S3 transformed_df.write.mode("overwrite").format("csv").save("s3://your_bucket/transformed/") job.commit()
Step 6: Triggering Another Lambda Function with S3 Event Notifications
Configure Second S3 Event Notification:
Repeat Step 2 to set up another event notification for the transformed data folder in S3.
Create and configure another Lambda function to handle this event.
Second Lambda Function Code:
import boto3 import os def lambda_handler(event, context): glue_client = boto3.client('glue') glue_job_name = os.environ['GLUE_JOB_NAME'] response = glue_client.start_job_run(JobName=glue_job_name) return { 'statusCode': 200, 'body': response }
Step 7: Final Glue Job to Insert Data into a Database
Write the Final Glue ETL Script:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import boto3 args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # Load transformed data from S3 datasource0 = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://your_bucket/transformed/"]}, format="csv" ) # Insert data into a database glueContext.write_dynamic_frame.from_options( frame = datasource0, connection_type = "redshift", connection_options = {"url": "jdbc:redshift://your_cluster:5439/yourdb", "dbtable": "your_table", "user": "your_user", "password": "your_password"} ) job.commit()
Conclusion
With this setup, you've automated a full data processing workflow using AWS Glue, S3, and Lambda. When you upload a file to S3, it triggers a series of events and transformations, ultimately inserting data into your database.
Feel free to customize the scripts and configurations to fit your specific use case. Happy automating!.