Saturday, January 28, 2023
HomeBig DataConstruct an information lake with Apache Flink on Amazon EMR

Construct an information lake with Apache Flink on Amazon EMR


To construct a data-driven enterprise, it is very important democratize enterprise knowledge belongings in an information catalog. With a unified knowledge catalog, you may shortly search datasets and work out knowledge schema, knowledge format, and site. The AWS Glue Knowledge Catalog gives a uniform repository the place disparate methods can retailer and discover metadata to maintain observe of information in knowledge silos.

Apache Flink is a extensively used knowledge processing engine for scalable streaming ETL, analytics, and event-driven purposes. It gives exact time and state administration with fault tolerance. Flink can course of bounded stream (batch) and unbounded stream (stream) with a unified API or software. After knowledge is processed with Apache Flink, downstream purposes can entry the curated knowledge with a unified knowledge catalog. With unified metadata, each knowledge processing and knowledge consuming purposes can entry the tables utilizing the identical metadata.

This submit reveals you tips on how to combine Apache Flink in Amazon EMR with the AWS Glue Knowledge Catalog with the intention to ingest streaming knowledge in actual time and entry the information in near-real time for enterprise evaluation.

Apache Flink connector and catalog structure

Apache Flink makes use of a connector and catalog to work together with knowledge and metadata. The next diagram reveals the structure of the Apache Flink connector for knowledge learn/write, and catalog for metadata learn/write.

Flink Glue Architecture

For knowledge learn/write, Flink has the interface DynamicTableSourceFactory for learn and DynamicTableSinkFactory for write. A special Flink connector implements two interfaces to entry knowledge in several storage. For instance, the Flink FileSystem connector has FileSystemTableFactory to learn/write knowledge in Hadoop Distributed File System (HDFS) or Amazon Easy Storage Service (Amazon S3), the Flink HBase connector has HBase2DynamicTableFactory to learn/write knowledge in HBase, and the Flink Kafka connector has KafkaDynamicTableFactory to learn/write knowledge in Kafka. You’ll be able to confer with Desk & SQL Connectors for extra info.

For metadata learn/write, Flink has the catalog interface. Flink has three built-in implementations for the catalog. GenericInMemoryCatalog shops the catalog knowledge in reminiscence. JdbcCatalog shops the catalog knowledge in a JDBC-supported relational database. As of this writing, MySQL and PostgreSQL databases are supported within the JDBC catalog. HiveCatalog shops the catalog knowledge in Hive Metastore. HiveCatalog makes use of HiveShim to supply totally different Hive model compatibility. We will configure totally different metastore purchasers to make use of Hive Metastore or the AWS Glue Knowledge Catalog. On this submit, we configure the Amazon EMR property hive.metastore.shopper.manufacturing facility.class to com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory (see Utilizing the AWS Glue Knowledge Catalog because the metastore for Hive) in order that we will use the AWS Glue Knowledge Catalog to retailer Flink catalog knowledge. Check with Catalogs for extra info.

Most Flink built-in connectors, similar to for Kafka, Amazon Kinesis, Amazon DynamoDB, Elasticsearch, or FileSystem, can use Flink HiveCatalog to retailer metadata within the AWS Glue Knowledge Catalog. Nevertheless, some connector implementations similar to Apache Iceberg have their very own catalog administration mechanism. FlinkCatalog in Iceberg implements the catalog interface in Flink. FlinkCatalog in Iceberg has a wrapper to its personal catalog implementation. The next diagram reveals the connection between Apache Flink, the Iceberg connector, and the catalog. For extra info, confer with Creating catalogs and utilizing catalogs and Catalogs.

Flink Iceberg Glue Architecture

Apache Hudi additionally has its personal catalog administration. Each HoodieCatalog and HoodieHiveCatalog implements a catalog interface in Flink. HoodieCatalog shops metadata in a file system similar to HDFS. HoodieHiveCatalog shops metadata in Hive Metastore or the AWS Glue Knowledge Catalog, relying on whether or not you configure hive.metastore.shopper.manufacturing facility.class to make use of com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory. The next diagram reveals relationship between Apache Flink, the Hudi connector, and the catalog. For extra info, confer with Create Catalog.

Flink Hudi Glue Architecture

As a result of Iceberg and Hudi have totally different catalog administration mechanisms, we present three situations of Flink integration with the AWS Glue Knowledge Catalog on this submit:

  • Learn/Write to Iceberg tables in Flink with metadata in Glue Knowledge Catalog
  • Learn/Write to Hudi tables in Flink with metadata in Glue Knowledge Catalog
  • Learn/Write to different storage format in Flink with metadata in Glue Knowledge Catalog

Answer overview

The next diagram reveals the general structure of the answer described on this submit.

Flink Glue Integration

On this answer, we allow an Amazon RDS for MySQL binlog to extract transaction adjustments in actual time. The Amazon EMR Flink CDC connector reads the binlog knowledge and processes the information. Reworked knowledge might be saved in Amazon S3. We use the AWS Glue Knowledge Catalog to retailer the metadata similar to desk schema and desk location. Downstream knowledge client purposes similar to Amazon Athena or Amazon EMR Trino entry the information for enterprise evaluation.

The next are the high-level steps to arrange this answer:

  1. Allow binlog for Amazon RDS for MySQL and initialize the database.
  2. Create an EMR cluster with the AWS Glue Knowledge Catalog.
  3. Ingest change knowledge seize (CDC) knowledge with Apache Flink CDC in Amazon EMR.
  4. Retailer the processed knowledge in Amazon S3 with metadata within the AWS Glue Knowledge Catalog.
  5. Confirm all desk metadata is saved within the AWS Glue Knowledge Catalog.
  6. Devour knowledge with Athena or Amazon EMR Trino for enterprise evaluation.
  7. Replace and delete supply information in Amazon RDS for MySQL and validate the reflection of the information lake tables.

Conditions

This submit makes use of an AWS Id and Entry Administration (IAM) position with permissions for the next providers:

  • Amazon RDS for MySQL (5.7.40)
  • Amazon EMR (6.9.0)
  • Amazon Athena
  • AWS Glue Knowledge Catalog
  • Amazon S3

Allow binlog for Amazon RDS for MySQL and initialize the database

To allow CDC in Amazon RDS for MySQL, we have to configure binary logging for Amazon RDS for MySQL. Check with Configuring MySQL binary logging for extra info. We additionally create the database salesdb in MySQL and create the tables buyer, order, and others to arrange the information supply.

  1. On the Amazon RDS console, select Parameter teams within the navigation pane.
  2. Create a brand new parameter group for MySQL.
  3. Edit the parameter group you simply created to set binlog_format=ROW.

RDS-Binlog-Format

  1. Edit the parameter group you simply created to set binlog_row_image=full.

RDS-Binlog-Row-Image

  1. Create an RDS for MySQL DB occasion with the parameter group.
  2. Be aware down the values for hostname, username, and password, which we use later.
  3. Obtain the MySQL database initialization script from Amazon S3 by working the next command:
aws s3 cp s3://emr-workshops-us-west-2/glue_immersion_day/scripts/salesdb.sql ./salesdb.sql

  1. Hook up with the RDS for MySQL database and run the salesdb.sql command to initialize the database, offering the host identify and person identify in keeping with your RDS for MySQL database configuration:
mysql -h <hostname> -u <username> -p
mysql> supply salesdb.sql

Create an EMR cluster with the AWS Glue Knowledge Catalog

From Amazon EMR 6.9.0, the Flink desk API/SQL can combine with the AWS Glue Knowledge Catalog. To make use of the Flink and AWS Glue integration, you should create an Amazon EMR 6.9.0 or later model.

  1. Create the file iceberg.properties for the Amazon EMR Trino integration with the Knowledge Catalog. When the desk format is Iceberg, your file ought to have following content material:
iceberg.catalog.kind=glue
connector.identify=iceberg

  1. Add iceberg.properties to an S3 bucket, for instance DOC-EXAMPLE-BUCKET.

For extra info on tips on how to combine Amazon EMR Trino with Iceberg, confer with Use an Iceberg cluster with Trino.

  1. Create the file trino-glue-catalog-setup.sh to configure the Trino integration with the Knowledge Catalog. Use trino-glue-catalog-setup.sh because the bootstrap script. Your file ought to have the next content material (exchange DOC-EXAMPLE-BUCKET together with your S3 bucket identify):
set -ex 
sudo aws s3 cp s3://DOC-EXAMPLE-BUCKET/iceberg.properties /and many others/trino/conf/catalog/iceberg.properties

  1. Add trino-glue-catalog-setup.sh to your S3 bucket (DOC-EXAMPLE-BUCKET).

Check with Create bootstrap actions to put in further software program to run a bootstrap script.

  1. Create the file flink-glue-catalog-setup.sh to configure the Flink integration with the Knowledge Catalog.
  2. Use a script runner and run the flink-glue-catalog-setup.sh script as a step operate.

Your file ought to have the next content material (the JAR file identify right here is utilizing Amazon EMR 6.9.0; a later model JAR identify could change, so be certain that to replace in keeping with your Amazon EMR model).

Be aware that right here we use an Amazon EMR step, not a bootstrap, to run this script. An Amazon EMR step script is run after Amazon EMR Flink is provisioned.

set -ex

sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/hive-exec.jar /lib/flink/lib
sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib
sudo cp /usr/lib/flink/decide/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar
sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar
sudo chmod 755 /usr/lib/flink/lib/hive-exec.jar
sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar

sudo wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar -O /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
sudo chmod 755 /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar

sudo ln -s /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar /usr/lib/flink/lib/
sudo ln -s /usr/lib/hudi/hudi-flink-bundle.jar /usr/lib/flink/lib/

sudo mv /usr/lib/flink/decide/flink-table-planner_2.12-1.15.2.jar /usr/lib/flink/lib/
sudo mv /usr/lib/flink/lib/flink-table-planner-loader-1.15.2.jar /usr/lib/flink/decide/

  1. Add flink-glue-catalog-setup.sh to your S3 bucket (DOC-EXAMPLE-BUCKET).

Check with Configuring Flink to Hive Metastore in Amazon EMR for extra info on tips on how to configure Flink and Hive Metastore. Check with Run instructions and scripts on an Amazon EMR cluster for extra particulars on working the Amazon EMR step script.

  1. Create an EMR 6.9.0 cluster with the purposes Hive, Flink, and Trino.

You’ll be able to create an EMR cluster with the AWS Command Line Interface (AWS CLI) or the AWS Administration Console. Check with the suitable subsection for directions.

Create an EMR cluster with the AWS CLI

To make use of the AWS CLI, full the next steps:

  1. Create the file emr-flink-trino-glue.json to configure Amazon EMR to make use of the Knowledge Catalog. Your file ought to have the next content material:
[
{
"Classification": "hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
},
{
"Classification": "trino-connector-hive",
"Properties": {
"hive.metastore": "glue"
}
}
]

  1. Run the next command to create the EMR cluster. Present your native emr-flink-trino-glue.json dad or mum folder path, S3 bucket, EMR cluster Area, EC2 key identify, and S3 bucket for EMR logs.
aws emr create-cluster --release-label emr-6.9.0 
--applications Title=Hive Title=Flink Title=Spark Title=Trino 
--region us-west-2 
--name flink-trino-glue-emr69 
--configurations "file:///<your configuration path>/emr-flink-trino-glue.json" 
--bootstrap-actions '[{"Path":"s3://DOC-EXAMPLE-BUCKET/trino-glue-catalog-setup.sh","Name":"Add iceberg.properties for Trino"}]' 
--steps '[{"Args":["s3://DOC-EXAMPLE-BUCKET/flink-glue-catalog-setup.sh"],"Kind":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":"s3://<area>.elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Title":"Flink-glue-integration"}]' 
--instance-groups 
InstanceGroupType=MASTER,InstanceType=m6g.2xlarge,InstanceCount=1 
InstanceGroupType=CORE,InstanceType=m6g.2xlarge,InstanceCount=2 
--use-default-roles 
--ebs-root-volume-size 30 
--ec2-attributes KeyName=<keyname> 
--log-uri s3://<s3-bucket-for-emr>/elasticmapreduce/

Create an EMR cluster on the console

To make use of the console, full the next steps:

  1. On the Amazon EMR console, create an EMR cluster and choose Use for Hive desk metadata for AWS Glue Knowledge Catalog settings.
  2. Add configuration settings with the next code:
[
{
"Classification": "trino-connector-hive",
"Properties": {
"hive.metastore": "glue"
}
}
]

EMR-6.9-Flink-Hive-Glue-1

  1. Within the Steps part, add a step referred to as Customized JAR.
  2. Set JAR location to s3://<area>.elasticmapreduce/libs/script-runner/script-runner.jar, the place <area> is the area wherein your EMR cluster resides.
  3. Set Arguments to the S3 path you uploaded earlier.

EMR-6.9-Flink-Hive-Glue-2

  1. Within the Bootstrap Actions part, select Customized Motion.
  2. Set Script location to the S3 path you uploaded.

EMR-6.9-Flink-Hive-Glue-3

  1. Proceed the following steps to finish your EMR cluster creation.

Ingest CDC knowledge with Apache Flink CDC in Amazon EMR

The Flink CDC connector helps studying database snapshots and captures updates within the configured tables. We have now deployed the Flink CDC connector for MySQL by downloading flink-sql-connector-mysql-cdc-2.2.1.jar and placing it into the Flink library after we create our EMR cluster. The Flink CDC connector can use the Flink Hive catalog to retailer Flink CDC desk schema into Hive Metastore or the AWS Glue Knowledge Catalog. On this submit, we use the Knowledge Catalog to retailer our Flink CDC desk.

Full the next steps to ingest RDS for MySQL databases and tables with Flink CDC and retailer metadata within the Knowledge Catalog:

  1. SSH to the EMR main node.
  2. Begin Flink on a YARN session by working the next command, offering your S3 bucket identify:
flink-yarn-session -d -jm 2048 -tm 4096 -s 2 
-D state.backend=rocksdb 
-D state.backend.incremental=true 
-D state.checkpoint-storage=filesystem 
-D state.checkpoints.dir=s3://<flink-glue-integration-bucket>/flink-checkponts/ 
-D state.checkpoints.num-retained=10 
-D execution.checkpointing.interval=10s 
-D execution.checkpointing.mode=EXACTLY_ONCE 
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION 
-D execution.checkpointing.max-concurrent-checkpoints=1

  1. Begin the Flink SQL shopper CLI by working the next command:
/usr/lib/flink/bin/sql-client.sh embedded

  1. Create the Flink Hive catalog by specifying the catalog kind as hive and offering your S3 bucket identify:
CREATE CATALOG glue_catalog WITH (
'kind' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/and many others/hive/conf.dist'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_cdc_db WITH ('hive.database.location-uri'= 's3://<flink-glue-integration-bucket>/flink-glue-for-hive/warehouse/')
use flink_cdc_db;

As a result of we’re configuring the EMR Hive catalog use the AWS Glue Knowledge Catalog, all of the databases and tables created within the Flink Hive catalog are saved within the Knowledge Catalog.

  1. Create the Flink CDC desk, offering the host identify, person identify, and password for the RDS for MySQL occasion you created earlier.

Be aware that as a result of the RDS for MySQL person identify and password shall be saved within the Knowledge Catalog as desk properties, you ought to be allow AWS Glue database/desk authorization with AWS Lake Formation to guard your delicate knowledge.

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_cdc` (
`CUST_ID` double NOT NULL,
`NAME` STRING NOT NULL,
`MKTSEGMENT` STRING NOT NULL,
PRIMARY KEY (`CUST_ID`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'CUSTOMER'
);

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` (
`SITE_ID` double NOT NULL,
`CUST_ID` double NOT NULL,
`ADDRESS` STRING NOT NULL,
`CITY` STRING NOT NULL,
`STATE` STRING NOT NULL,
`COUNTRY` STRING NOT NULL,
`PHONE` STRING NOT NULL,
PRIMARY KEY (`SITE_ID`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'CUSTOMER_SITE'
);

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` (
`ORDER_ID` int NOT NULL,
`SITE_ID` double NOT NULL,
`ORDER_DATE` TIMESTAMP NOT NULL,
`SHIP_MODE` STRING NOT NULL
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '3306',
'username' = '<username>',
'password' = '<password>',
'database-name' = 'salesdb',
'table-name' = 'SALES_ORDER_ALL',
'scan.incremental.snapshot.enabled' = 'FALSE'
);

  1. Question the desk you simply created:
SELECT rely(O.ORDER_ID) AS ORDER_COUNT,
C.CUST_ID,
C.NAME,
C.MKTSEGMENT
FROM   customer_cdc C
JOIN customer_site_cdc CS
ON C.CUST_ID = CS.CUST_ID
JOIN sales_order_all_cdc O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT;

You’ll get a question outcome like following screenshot.

Flink-SQL-CDC-Test

Retailer processed knowledge in Amazon S3 with metadata within the Knowledge Catalog

As we’re ingesting the relational database knowledge in Amazon RDS for MySQL, uncooked knowledge could also be up to date or deleted. To help knowledge replace and delete, we will select knowledge lake applied sciences similar to Apache Iceberg or Apache Hudi to retailer the processed knowledge. As we talked about earlier, Iceberg and Hudi have totally different catalog administration. We present each situations to make use of Flink to learn/write the Iceberg and Hudi tables with metadata within the AWS Glue Knowledge Catalog.

For non-Iceberg and non-Hudi, we use a FileSystem Parquet file to indicate how the Flink built-in connector makes use of the Knowledge Catalog.

Learn/Write to Iceberg tables in Flink with metadata in Glue Knowledge Catalog

The next diagram reveals the structure for this configuration.

Flink Glue Integration for Iceberg

  1. Create a Flink Iceberg catalog utilizing the Knowledge Catalog by specifying catalog-impl as org.apache.iceberg.aws.glue.GlueCatalog.

For extra details about Flink and Knowledge Catalog integration for Iceberg, confer with Glue Catalog.

  1. Within the Flink SQL shopper CLI, run the next command, offering your S3 bucket identify:
CREATE CATALOG glue_catalog_for_iceberg WITH (
'kind'='iceberg',
'warehouse'='s3://<flink-glue-integration-bucket>/flink-glue-for-iceberg/warehouse/',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager',
'lock.desk'='FlinkGlue4IcebergLockTable' );

  1. Create an Iceberg desk to retailer processed knowledge:
USE CATALOG glue_catalog_for_iceberg;
CREATE DATABASE IF NOT EXISTS flink_glue_iceberg_db;
USE flink_glue_iceberg_db;
CREATE TABLE `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH (
'format-version'='2',
'write.upsert.enabled'='true');

  1. Insert the processed knowledge into Iceberg:
INSERT INTO `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
rely(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Learn/Write to Hudi tables in Flink with metadata in Glue Knowledge Catalog

The next diagram reveals the structure for this configuration.

Flink Glue Integration for Hudi

Full the next steps:

  1. Create a catalog for Hudi to make use of the Hive catalog by specifying mode as hms.

As a result of we already configured Amazon EMR to make use of the Knowledge Catalog after we created the EMR cluster, this Hudi Hive catalog makes use of the Knowledge Catalog below the hood. For extra details about Flink and Knowledge Catalog integration for Hudi, confer with Create Catalog.

  1. Within the Flink SQL shopper CLI, run the next command, offering your S3 bucket identify:
CREATE CATALOG glue_catalog_for_hudi WITH (
'kind' = 'hudi',
'mode' = 'hms',
'desk.exterior' = 'true',
'default-database' = 'default',
'hive.conf.dir' = '/and many others/hive/conf.dist',
'catalog.path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/'
);

  1. Create a Hudi desk utilizing the Knowledge Catalog, and supply your S3 bucket identify:
USE CATALOG glue_catalog_for_hudi;
CREATE DATABASE IF NOT EXISTS flink_glue_hudi_db;
use flink_glue_hudi_db;
CREATE TABLE `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH (
'connector' = 'hudi',
'write.duties' = '4',
'path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/customer_summary',
'desk.kind' = 'COPY_ON_WRITE',
'learn.streaming.enabled' = 'true',
'learn.streaming.check-interval' = '1'
);

  1. Insert the processed knowledge into Hudi:
INSERT INTO `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
rely(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Learn/Write to different storage format in Flink with metadata in Glue Knowledge Catalog

The next diagram reveals the structure for this configuration.

Flink Glue Integration for Parquet

We already created the Flink Hive catalog within the earlier step, so we’ll reuse that catalog.

  1. Within the Flink SQL shopper CLI, run the next command:
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_hive_parquet_db;
use flink_hive_parquet_db;

We alter the SQL dialect to Hive to create a desk with Hive syntax.

  1. Create a desk with the next SQL, and supply your S3 bucket identify:
SET desk.sql-dialect=hive;

CREATE TABLE `customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT
)
STORED AS parquet
LOCATION 's3://<flink-glue-integration-bucket>/flink-glue-for-hive-parquet/warehouse/customer_summary';

As a result of Parquet recordsdata don’t help up to date rows, we will’t eat knowledge from CDC knowledge. Nevertheless, we will eat knowledge from Iceberg or Hudi.

  1. Use the next code to question the Iceberg desk and insert knowledge into the Parquet desk:
SET desk.sql-dialect=default;
SET execution.runtime-mode = batch;
INSERT INTO `glue_catalog`.`flink_hive_parquet_db`.`customer_summary`
SELECT * from `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`;

Confirm all desk metadata is saved within the Knowledge Catalog

You’ll be able to navigate to the AWS Glue console to confirm all of the tables are saved within the Knowledge Catalog.

  1. On the AWS Glue console, select Databases within the navigation pane to listing all of the databases we created.

Glue-Databases

  1. Open a database and confirm that every one the tables are in that database.

Glue-Tables

Devour knowledge with Athena or Amazon EMR Trino for enterprise evaluation

You should utilize Athena or Amazon EMR Trino to entry the outcome knowledge.

Question the information with Athena

To entry the information with Athena, full the next steps:

  1. Open the Athena question editor.
  2. Select flink_glue_iceberg_db for Database.

It is best to see the customer_summary desk listed.

  1. Run the next SQL script to question the Iceberg outcome desk:
choose * from customer_summary order by order_count desc restrict 10

The question outcome will seem like the next screenshot.

Athena-Iceberg-Query

  1. For the Hudi desk, change Database to flink_glue_hudi_db and run the identical SQL question.

Athena-Hudi-Query

  1. For the Parquet desk, change Database to flink_hive_parquet_db and run the identical SQL question.

Athena-Parquet-Query

Question the information with Amazon EMR Trino

To entry Iceberg with Amazon EMR Trino, SSH to the EMR main node.

  1. Run the next command to start out the Trino CLI:
trino-cli --catalog iceberg

Amazon EMR Trino can now question the tables within the AWS Glue Knowledge Catalog.

  1. Run the next command to question the outcome desk:
present schemas;
use flink_glue_iceberg_db;
present tables;
choose * from customer_summary order by order_count desc restrict 10;

The question outcome seems to be like the next screenshot.

EMR-Trino-Iceberg-Query

  1. Exit the Trino CLI.
  2. Begin the Trino CLI with the hive catalog to question the Hudi desk:
  1. Run the next command to question the Hudi desk:
present schemas;
use flink_glue_hudi_db;
present tables;
choose * from customer_summary order by order_count desc restrict 10;

Replace and delete supply information in Amazon RDS for MySQL and validate the reflection of the information lake tables

We will replace and delete some information within the RDS for MySQL database after which validate that the adjustments are mirrored within the Iceberg and Hudi tables.

  1. Hook up with the RDS for MySQL database and run the next SQL:
replace CUSTOMER set NAME = 'updated_name' the place CUST_ID=7;

delete from CUSTOMER the place CUST_ID=11;

  1. Question the customer_summary desk with Athena or Amazon EMR Trino.

The up to date and deleted information are mirrored within the Iceberg and Hudi tables.

Athena-Iceberg-Query-Updated

Clear up

If you’re achieved with this train, full the next steps to delete your assets and cease incurring prices:

  1. Delete the RDS for MySQL database.
  2. Delete the EMR cluster.
  3. Drop the databases and tables created within the Knowledge Catalog.
  4. Take away recordsdata in Amazon S3.

Conclusion

This submit confirmed you tips on how to combine Apache Flink in Amazon EMR with the AWS Glue Knowledge Catalog. You should utilize a Flink SQL connector to learn/write knowledge in a unique retailer, similar to Kafka, CDC, HBase, Amazon S3, Iceberg, or Hudi. You can even retailer the metadata within the Knowledge Catalog. The Flink desk API has the identical connector and catalog implementation mechanism. In a single session, we will use a number of catalog cases pointing to differing kinds, like IcebergCatalog and HiveCatalog, and use then interchangeably in your question. You can even write code with the Flink desk API to develop the identical answer to combine Flink and the Knowledge Catalog.

In our answer, we consumed the RDS for MySQL binary log straight with Flink CDC. You can even use Amazon MSK Join to eat the binary log with MySQL Debezim and retailer the information in Amazon Managed Streaming for Apache Kafka (Amazon MSK). Check with Create a low-latency source-to-data lake pipeline utilizing Amazon MSK Join, Apache Flink, and Apache Hudi for extra info.

With the Amazon EMR Flink unified batch and streaming knowledge processing operate, you may ingest and course of knowledge with one computing engine. With Apache Iceberg and Hudi built-in in Amazon EMR, you may construct an evolvable and scalable knowledge lake. With the AWS Glue Knowledge Catalog, you may handle all enterprise knowledge catalogs in a unified method and eat knowledge simply.

Observe the steps on this submit to construct your unified batch and streaming answer with Amazon EMR Flink and the AWS Glue Knowledge Catalog. Please go away a remark when you’ve got any questions.


Concerning the Authors

Jianwei Li is Senior Analytics Specialist TAM. He gives guide service for AWS enterprise help clients to design and construct trendy knowledge platform.


Samrat Deb is Software program Improvement Engineer at Amazon EMR. In his spare time, he love exploring new locations, totally different tradition and meals.


Prabhu Josephraj is a Senior Software program Improvement Engineer working for Amazon EMR. He’s targeted on main the staff that builds options in Apache Hadoop and Apache Flink. In his spare time, Prabhu enjoys spending time along with his household.



Supply hyperlink

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments