Amazon EMR Cluster to Athena Partitioned Data - Quickly and Simply !

August 2, 2018

 

 

How to ask friendly Amazon Goddess Athena to get your partitioned data from Hadoop cluster

 

 

 

Do you hesitate if you need a Hadoop cluster for a few Hive tables only ?

Do you query your data not so frequently ?

Do you want to save money on servers' maintenance and put your lazy data on the cloud ?

 

Amazon thought about it and suggest you a serverless service: Amazon Athena.

The short description on Amazon Athena web page give us a good point to start:

  • An interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL.

  • There is no infrastructure to manage, and you pay only for the queries that you run.

  • Put your data in Amazon S3, define the schema, and start querying using standard SQL – simply.

  • Using Glue’s fully-managed ETL capabilities to transform data or convert it into columnar formats to optimize cost and improve performance.

 

Amazon Athena uses a price per query policy:

At the time of writing it's 5 dollars per 1 TB data scanned. So in order to reduce your cost you mostly need to implement a partitioned schema for big tables.

But how we can store a partitioned data automatically ? Here I'll try to describe a relatively simple process to achieve this goal.

 

One of our customer has a Hadoop cluster (in his case it was AWS EMR – Elastic Map Reduce cluster in the Amazon cloud) and want to store a few partitioned tables in S3 bucket with the ability to query them from time to time.

 

Below I describe this process in short:

 

  1. Create a Hive non-partitioned table to store you source data.

  2. Create a Hive partitioned table.

  3. Create Athena partitioned table.

  4. Insert into Hive partitioned data from the source table.

  5. Copy data files to your local directory.

  6. Rename your files if it needed and synchronize the folder from the previous step to Amazon S3 bucket.

  7. Load Athena table partitions automatically.

 

Let's go through each step and do the magic ! 

 

  1. A non-partitioned table will the source for partitioned data due to the fact we don't need to have partitioned columns as a part of a table definition. Our partition will be based on year, month and day and derived from a timestamp column:

 

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_logs

    (

        log_date TIMESTAMP, 

        user_id  STRING,

        page_path   STRING,

        referer   STRING,

        visitor_id STRING,

        ip STRING,

        session_id STRING,

        operating_sys STRING,

        keyword STRING

    )

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|'

STORED AS PARQUET

LOCATION '/user/admin/nonpartitioned';

 

2. A partitioned Hive table will include three partitioned columns as I described in the
    previous step:

 

SET hive.exec.dynamic.partition = true; 

SET hive.exec.dynamic.partition.mode = nonstrict;

 

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_logs_part

    (

        log_date TIMESTAMP, 

        user_id STRING,

        page_path STRING,

        referer STRING,

        visitor_id STRING,

        ip STRING,

        session_id STRING,

        operating_sys STRING,

        keyword STRING

    )

PARTITIONED BY

(

        `year` STRING,

        `month` STRING,

        `day` STRING

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|'

STORED AS PARQUET

LOCATION '/user/admin/partitioned';

 

3. A partitioned Athena table will include three partitioned columns like Hive partitioned table:

 

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_logs_ath (

    log_date STRING, 

    user_id STRING,

    page_path STRING,

    referer STRING,

    visitor_id STRING,

    ip STRING,

    session_id STRING,

    operating_sys STRING,

    keyword STRING

)

PARTITIONED BY (`year` STRING,`month` STRING, `day` STRING)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|'

STORED AS PARQUET

LOCATION 's3://database-name/athena-tables/';

 

 

4. Let's populate the Hive table from the paragraph #2 and check if Hive have created a new tree hierarchy in the HDFS file system:

 

INSERT INTO TABLE cloudfront_logs_part

PARTITION

(

    `year`,

    `month`,

    `day`

)

SELECT

    log_date,

    user_id,

    page_path,

    referer,

    visitor_id,

    ip,

    session_id,

    operating_sys,

    keyword,

    year(log_date) as `year`,

    month(log_date) as `month`,

    day(log_date) as `day`

FROM

    cloudfront_logs;

 

If we run ls command we'll see a new hierarchy of directory regarding to a date, for example /user/admin/partitioned/year=2018/month=07/day=09:

 

hdfs dfs -ls /user/admin/partitioned

or

hdfs dfs -ls -R / user/admin/partitioned

 

5. In this step we need to copy partitioned directory structure with files to a local directory outside of a Hadoop cluster for the ability to transfer these directories and files to S3 bucket for Athena.

 

  • If EMR cluster has an old version of AWS CLI first of all upgrade this utility!

  • All steps from 4 to 7 we'll perform in one bash file.

  •  Also you can just upgrade it without checking the version

 

#!/bin/bash

 

aws --version

status=$?

if [ $status -e 0 ] ; then

   pip install awscli --upgrade --user

fi

 

currentDate=`date +"%Y""%m""%d"_"%H""%M""%S"`

mkdir /tmp/partitioned

sudo su hdfs -c "hdfs dfs -copyToLocal /user/admin/partitioned/* /tmp/partitioned/"

 

6. We'll rename all files due to limitations of "aws s3 sync" command because our job runs each day and we don’t overwrite existing files in S3 bucket. The command implements the following logic:

 

  • a local file will require uploading if the size of the local file is different than the size of the s3 object.

  • the last modified time of the local file is newer than the last modified time of the s3 object.

  • or the local file does not exist under the specified bucket and prefix.

The next step in the flow is to synchronize a local directory to S3 bucket.

 

hive_files=`{ find /tmp/partitioned/ -type f | xargs ls -1t; }`

if [ `{ find /tmp/partitioned/ -type f | xargs ls -1t; } | wc -l` -gt 0 ]; then

            for fileName in $hive_files

            do

                        sudo mv "$fileName" "${fileName}_$currentDate"

            done

   aws s3 sync /tmp/partitioned/ s3://database-name/athena-tables

fi

 

7. The last step we'll load all partitions automatically by AWS CLI command:

 

aws athena start-query-execution --query-string "MSCK REPAIR TABLE cloudfront_logs_ath;" --result-configuration OutputLocation=s3://aws-athena-query-results-…/

 

  •  The default query results bucket you can find in Settings menu in Athena web application:

 

Now you can query your table any time and decrease your costs by scanning less data.

A few important notes to add:

 

  • Add permission to EMR_EC2_DefaultRole to access Athena from an EMR cluster for the ability to perform queries and load partitions (read the documentation thoroughly).
     

  • The preferable storage format for partitioned tables is PARQUET, you can use a TEXTFILE format but it's possible to encounter an indentation of data between columns.
     

  • Don't mess your directories and put each data to dedicated directory.
     

  • Create database in Athena for your tables in the parent S3 bucket if you don't want to use a default database, for example:

 

CREATE DATABASE IF NOT EXISTS database-name

LOCATION 's3://database-name';

 

  • You also can create an EMR cluster by the command, for example:

 

aws emr create-cluster --name "my cluster" --release-label emr-4.6.0 --use-default-roles \

--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=m4.xlarge \

--ec2-attributes KeyName=my-cluster, SubnetId=subnet-206aeb0a \

--applications Name=Hadoop Name=Hive Name=Hue \

--log-uri s3:///logs/emr/ \

--steps \

Type=CUSTOM_JAR,Name=import_to_hdfs,ActionOnFailure=CANCEL_AND_WAIT,Jar=s3://elasticmapreduce/libs/script-runner/script-runner.jar,Args=s3://tracking/scripts/import_s3_to_hdfs.sh \

Type=Hive,Name=CreateHiveTables,ActionOnFailure=CANCEL_AND_WAIT,Args=-f,s3://tracking/scripts/HiveCreateTable.sql \

Type=CUSTOM_JAR,Name=import_to_postgresql,ActionOnFailure=CANCEL_AND_WAIT,Jar=s3://elasticmapreduce/libs/script-runner/script-runner.jar,Args=s3://tracking/scripts/load_new_events_to_postgres.sh \

Type=CUSTOM_JAR,Name=load_into_athena,ActionOnFailure=CANCEL_AND_WAIT,Jar=s3://elasticmapreduce/libs/script-runner/script-runner.jar,Args=s3://tracking/scripts/load_into_athena.sh \

Type=CUSTOM_JAR,Name=terminate_cluster,ActionOnFailure=CANCEL_AND_WAIT,Jar=s3://elasticmapreduce/libs/script-runner/script-runner.jar,Args=s3://tracking/scripts/terminate_cluster.sh

 

As you see we saved all scripts in S3 dedicated bucket and run each cluster with a few steps when more of them run from a cluster console.

 

Good luck and share the knowledge !

 

Pavel Zeger.

 

 

Please reload

Featured Posts

I'm busy working on my blog posts. Watch this space!

Please reload

Recent Posts

October 31, 2017

October 29, 2017

Please reload

Archive
Please reload

Search By Tags