Automating Data Processing with AWS Glue, S3, and Lambda

ยท

4 min read

Automating Data Processing with AWS Glue, S3, and Lambda

In this blog, we'll explore an end-to-end data processing automation workflow using AWS Glue, S3, and Lambda. We'll cover uploading objects to S3, triggering Lambda functions via S3 event notifications, running AWS Glue jobs, and storing transformed data back into S3 and a database.

Prerequisites

Before we begin, ensure you have the following:

  • AWS account with necessary permissions

  • AWS CLI configured

  • Basic knowledge of AWS services (S3, Lambda, Glue, IAM)

Step 1: Setting Up Your S3 Bucket

  1. Create an S3 Bucket:

    • Go to the S3 service in your AWS console.

    • Click "Create bucket."

    • Enter a unique bucket name and select the region.

    • Leave other settings as default and create the bucket.

  2. Upload a Sample File:

    • Click on your newly created bucket.

    • Click "Upload" and add a sample CSV file.

Step 2: Setting Up S3 Event Notifications

  1. Configure S3 Event Notifications:

    • Go to your S3 bucket and click on "Properties."

    • Scroll down to "Event notifications" and click "Create event notification."

    • Enter a name for the event.

    • Select "All object create events."

    • Choose "Lambda function" as the destination and select or create your Lambda function (we'll create this next).

Step 3: Creating Your Lambda Function

  1. Create a Lambda Function:

    • Go to the Lambda service in your AWS console.

    • Click "Create function."

    • Select "Author from scratch."

    • Enter a function name and choose a runtime (e.g., Python 3.8).

    • Click "Create function."

  2. Add Environment Variables:

    • Go to your Lambda function and click on "Configuration."

    • Click "Environment variables" and add the following:

      • GLUE_JOB_NAME: The name of your Glue job.
  3. Write the Lambda Function Code:

     import json
     import boto3
    
     def lambda_handler(event, context):
         glue = boto3.client('glue')
    
         for record in event['Records']:
             s3_bucket = record['s3']['bucket']['name']
             s3_key = record['s3']['object']['key']
    
             # Extract the filename from the S3 key
             file_name = s3_key.split('/')[-1]
    
             # Set default env value
             env = 'qa'
             output_file_path ='s3://automation/'
    
             # Check conditions based on keywords in the input file path
             if 'QA' in s3_key:
                 env = 'qa'
                 output_file_path = 's3://automation/'
             elif 'STAGING' in s3_key:
                 env = 'staging'
                 output_file_path = ''
             elif 'PROD' in s3_key:
                 env = 'prod'
                 output_file_path = 's3://automation/'
             elif 'DEV' in s3_key:
                 env = 'dev'
                 output_file_path = 's3://automation/'
             # Add additional conditions for other environments if needed
    
             # Define job parameters based on the conditions
             job_parameters = {
                 '--filename': file_name,
                 '--env': env,
                 '--output_file_path': output_file_path,
                 '--input_file_path': f's3://{s3_bucket}/{s3_key}'
             }
    
             glue_job_name = 'My-Testing-Glue-Job'
    
             # Trigger the AWS Glue job with job parameters
             response = glue.start_job_run(
                 JobName=glue_job_name,
                 Arguments=job_parameters
             )
    
             print(f"Triggered Glue Job: {glue_job_name} with parameters: {job_parameters}")
    
             # You can add error handling and logging as needed.
    

Step 4: Setting Up AWS Glue

  1. Create a Glue Database:

    • Go to the Glue service in your AWS console.

    • Click "Databases" and then "Add database."

    • Enter a database name and click "Create."

  2. Create a Glue Job:

    • Go to "Jobs" and click "Add job."

    • Enter a job name.

    • Choose "IAM Role" with necessary permissions.

    • Select your source and target data stores (S3 buckets).

    • Write your ETL script (example below).

Step 5: Writing the Glue ETL Script

  1. Sample ETL Script:

     import sys
     from awsglue.transforms import *
     from awsglue.utils import getResolvedOptions
     from pyspark.context import SparkContext
     from awsglue.context import GlueContext
     from awsglue.job import Job
    
     args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     sc = SparkContext()
     glueContext = GlueContext(sc)
     spark = glueContext.spark_session
     job = Job(glueContext)
     job.init(args['JOB_NAME'], args)
    
     # Load data from S3
     datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "your_database", table_name = "your_table", transformation_ctx = "datasource0")
    
     # Transform data
     transformed_df = datasource0.toDF().withColumnRenamed("old_column_name", "new_column_name")
    
     # Save transformed data back to S3
     transformed_df.write.mode("overwrite").format("csv").save("s3://your_bucket/transformed/")
    
     job.commit()
    

Step 6: Triggering Another Lambda Function with S3 Event Notifications

  1. Configure Second S3 Event Notification:

    • Repeat Step 2 to set up another event notification for the transformed data folder in S3.

    • Create and configure another Lambda function to handle this event.

  2. Second Lambda Function Code:

     import boto3
     import os
    
     def lambda_handler(event, context):
         glue_client = boto3.client('glue')
         glue_job_name = os.environ['GLUE_JOB_NAME']
    
         response = glue_client.start_job_run(JobName=glue_job_name)
         return {
             'statusCode': 200,
             'body': response
         }
    

Step 7: Final Glue Job to Insert Data into a Database

  1. Write the Final Glue ETL Script:

     import sys
     from awsglue.transforms import *
     from awsglue.utils import getResolvedOptions
     from pyspark.context import SparkContext
     from awsglue.context import GlueContext
     from awsglue.job import Job
     import boto3
    
     args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     sc = SparkContext()
     glueContext = GlueContext(sc)
     spark = glueContext.spark_session
     job = Job(glueContext)
     job.init(args['JOB_NAME'], args)
    
     # Load transformed data from S3
     datasource0 = glueContext.create_dynamic_frame.from_options(
         connection_type="s3",
         connection_options={"paths": ["s3://your_bucket/transformed/"]},
         format="csv"
     )
    
     # Insert data into a database
     glueContext.write_dynamic_frame.from_options(
         frame = datasource0, 
         connection_type = "redshift", 
         connection_options = {"url": "jdbc:redshift://your_cluster:5439/yourdb", "dbtable": "your_table", "user": "your_user", "password": "your_password"}
     )
    
     job.commit()
    

Conclusion

With this setup, you've automated a full data processing workflow using AWS Glue, S3, and Lambda. When you upload a file to S3, it triggers a series of events and transformations, ultimately inserting data into your database.

Feel free to customize the scripts and configurations to fit your specific use case. Happy automating!.

ย