Automatic Open Source-based Data Pipelines? Openshift To the Rescue!

Automatic Open Source-based Data Pipelines? Openshift To the Rescue!

Image for post

Photo by Stephen Dawson on Unsplash
Image for post

Demo Architecture

Prerequisites

  • A running Ceph Cluster (> RHCS4)
  • A running Openshift 4 cluster (> 4.6.8)
  • An OCS cluster, in external mode, to provide both Object and Block storage

Installation

Create a new project in your Openshift cluster, where all resources should be deployed:

$ oc new-project data-engineering-demo
Image for post

Installed Operators in the created project
$ git clone https://github.com/shonpaz123/cephdemos.git
$ cd cephdemos/data-engineering-pipeline-demo-ocp

Data Services Preparation

Preparing our S3 environment

Now that we have all the prerequisites ready, let’s start by creating our needed S3 resources. As we are using an external Ceph cluster, we should create the needed S3 user in order to interact with the cluster. Additionally, we need to create an S3 bucket so that Kafka could export our events to the data lake. Let’s create those resources:

$ cd 01-ocs-external-ceph && ./run.sh && cd ..
{
"user_id": "data-engineering-demo",
"display_name": "data-engineering-demo",
"email": "",
"suspended": 0,
"max_buckets": 1000,
"subusers": [],
"keys": [
{
"user": "data-engineering-demo",
"access_key": "HC8V2PT7HX8ZFS8NQ37R",
"secret_key": "Y6CENKXozDDikJHQgkbLFM38muKBnmWBsAA1DXyU"
}
.
.
.
}
make_bucket: music-chart-songs-store-changelog

Deploying Kafka new-ETL

Now that we have our S3 ready, we need to deploy all the needed Kafka resources. In this section we’ll deploy a Kafka cluster, using the AMQ Streams operator, that is offered via the Openshift Operator Hub. Additionally, we’ll deploy Kafka Topics and Kafka Connect as well, in order to export all of the existing topic events to our S3 bucket. Important! make sure that you change the endpoint URL to suit yours, or else Kafka Connect will try to expose the events with no success.

$ cd 02-kafka && ./run.sh && cd ..
$ oc get pods 
NAME READY STATUS RESTARTS AGE
amq-streams-cluster-operator-v1.6.2-5b688f757-vhqcq 1/1 Running 0 7h35m
my-cluster-entity-operator-5dfbdc56bd-75bxj 3/3 Running 0 92s
my-cluster-kafka-0 1/1 Running 0 2m10s
my-cluster-kafka-1 1/1 Running 0 2m10s
my-cluster-kafka-2 1/1 Running 0 2m9s
my-cluster-zookeeper-0 1/1 Running 0 2m42s
my-connect-cluster-connect-7bdc77f479-vwdbs 1/1 Running 0 71s
presto-operator-dbbc6b78f-m6p6l 1/1 Running 0 7h30m
$ oc get kt
NAME CLUSTER PARTITIONS REPLICATION FACTOR
connect-cluster-configs my-cluster 1 3
connect-cluster-offsets my-cluster 25 3
connect-cluster-status my-cluster 5 3
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 3
music-chart-songs-store-changelog my-cluster 1 1
played-songs my-cluster 12 3
songs my-cluster 12 3

Running Presto for Distributed Querying

In this demo, we’ll use Presto’s ability to query S3 bucket prefixes (similar to tables in relational databases). Presto needs a schema to be created, in order to understand what is the file structure that it needs to query, in our example, all events that are being exported to our S3 bucket will look like the following:

{"count":7,"songName":"The Good The Bad And The Ugly"}
$ cd 04-presto && ./run.sh && cd ..
$ oc get pods | egrep -e "presto|postgres"
NAME READY STATUS RESTARTS AGE
hive-metastore-presto-cluster-576b7bb848-7btlw 1/1 Running 0 15s
postgres-68d5445b7c-g9qkj 1/1 Running 0 77s
presto-coordinator-presto-cluster-8f6cfd6dd-g9p4l 1/2 Running 0 15s
presto-operator-dbbc6b78f-m6p6l 1/1 Running 0 7h33m
presto-worker-presto-cluster-5b87f7c988-cg9m6 1/1 Running 0 15s

Visualizing real-time data with Superset

Superset is a visualization tool, that can present visualization and dashboards from many JDBC resources, such as Presto, Postgres, etc. As Presto has no real UI that provides us the ability to explore our data, controlling permissions, and RBAC, we’ll use Superset.

$ cd 05-superset && ./run.sh && cd ..
$ oc get pods | grep superset
superset-1-deploy 0/1 Completed 0 72s
superset-1-g65xr 1/1 Running 0 67s
superset-db-init-6q75s 0/1 Completed 0 71s

Data Logic Preparation

After we have all of our infrastructure services ready, we need to create the data logic behind our streaming application. As Presto queries data from our S3 bucket, we need to create a schema, that will allow Presto to know how it should query our data, so as a table to provide the structure knowledge.

$ oc rsh $(oc get pods | grep coordinator | grep Running | awk '{print $1}')
$ presto-cli --catalog hive
$ CREATE SCHEMA hive.songs WITH (location='s3a://music-chart-songs-store-changelog/music-chart-songs-store-changelog.json/');
$ USE hive.songs;
$ CREATE TABLE songs (count int, songName varchar) WITH (format = 'json', external_location = 's3a://music-chart-songs-store-changelog/music-chart-songs-store-changelog.json/');
$ select * from songs;
count | songname
-------+----------
(0 rows)
Query 20210203_162730_00005_7hsqi, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
1.01 [0 rows, 0B] [0 rows/s, 0B/s]

Streaming Real-Time Events

Now that all resources are ready to use, we can finally deploy our streaming application! Our streaming application is actually a Kafka producer that simulates a media player, it has a pre-defined list of songs that are being randomly “played” by our media player. Each time a user plays a song, the event is being sent to a Kafka topic.

$ cd 03-music-chart-app && ./run.sh && cd ..
$ oc get pods | egrep -e "player|music"
music-chart-576857c7f8-7l65x 1/1 Running 0 18s
player-app-79fb9cd54f-bhtl5 1/1 Running 0 19s
$ oc logs player-app-79fb9cd54f-bhtl52021-02-03 16:28:41,970 INFO  [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 1: The Good The Bad And The Ugly played.
2021-02-03 16:28:46,970 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 1: The Good The Bad And The Ugly played.
2021-02-03 16:28:51,970 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 2: Believe played.
2021-02-03 16:28:56,970 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 3: Still Loving You played.
2021-02-03 16:29:01,972 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 2: Believe played.
2021-02-03 16:29:06,970 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 7: Fox On The Run played.
$ oc logs music-chart-576857c7f8-7l65x [KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=1, songName=Believe]
[KTABLE-TOSTREAM-0000000006]: 8, PlayedSong [count=1, songName=Perfect]
[KTABLE-TOSTREAM-0000000006]: 3, PlayedSong [count=1, songName=Still Loving You]
[KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=1, songName=The Good The Bad And The Ugly]
[KTABLE-TOSTREAM-0000000006]: 6, PlayedSong [count=1, songName=Into The Unknown]
[KTABLE-TOSTREAM-0000000006]: 3, PlayedSong [count=2, songName=Still Loving You]
[KTABLE-TOSTREAM-0000000006]: 5, PlayedSong [count=1, songName=Sometimes]
[KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=2, songName=Believe]
[KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=2, songName=The Good The Bad And The Ugly]
Image for post

S3 browser of our created bucket prefix
$ presto> presto-cli --catalog hive
$ presto:songs> USE hive.songs;
$ select * from songs;
count | songname
-------+-------------------------------
1 | Bohemian Rhapsody
4 | Still Loving You
1 | The Good The Bad And The Ugly
3 | Believe
1 | Perfect
1 | Sometimes
2 | The Good The Bad And The Ugly
2 | Bohemian Rhapsody
3 | Still Loving You
4 | Sometimes
2 | Into The Unknown
4 | Believe
4 | Into The Unknown
2 | Sometimes
5 | Still Loving You
3 | The Good The Bad And The Ugly
$ oc get routeNAME       HOST/PORT                                            PATH   SERVICES   PORT       TERMINATION   WILDCARD
superset superset-data-engineering-demo.apps.ocp.spaz.local superset 8088-tcp None
Image for post

Testing Presto’s connection while creating a database
Image for post

Creating a Query to Visualization
Image for post

Real-Time data dashboard

Conclusion

In this demo, we saw how we can leverage Open Source products in order to run automatic data pipelines, all scheduled on Openshift. As Kubernetes breaks the records of adoption, organizations should consider moving their workloads towards Kubernetes, so that their data services won’t be left behind. Using Red Hat and Partner Operators, Openshift offers both day-1 and day-2 management to your data services.

Leave a Comment