Why and How I Integrated Airbyte and Apache Hudi

Harsha Teja Kanna
SelectFrom
Published in
8 min readJan 19, 2022

New:

More related content at https://www.denote.dev/

Follow up: https://www.ekalavya.dev/how-i-integrated-airbyte-and-apache-hudi-again/

Follow up on this blog https://www.ekalavya.dev/why-and-how-i-implemented-airbyte-protocol-in-rust/

Towards the end of 2021, I was working on putting Apache Hudi into operation/production at work.As part of the effort, I ran it on various datasets in cloud storage to create transactional data-lake tables and set up incremental syncing of CDC changes.

While the end result of the effort was incredible, it did take a lot of time to understand various Apache Hudi features and how to configure them. I also faced some issues/bugs, which were fixed by the Hudi team in recent releases.

Eventually, I even made a small contribution to support S3EventsHoodieIncrSource in the 0.10.0 release. The Hudi offical Slack members were very helpful.
This article details my learnings from this experience.

Problem statement

As I was trying to make Hudi easy to operate for the team and getting the configuration correct, I had to go through this process of creating a table multiple times for many tables.

- Unload historical data from existing warehouse.
- Bootstrap Hudi table.
- Run catchup of the CDC events during the bootstrap runtime (many hours for huge tables).
- Finally, set up the incremental delta sync of the table.

Each of the above steps involves very detail-oriented and careful work as there are a variety of sources (databases) and destinations (storage), tables and environments.

Though, I developed a declarative configuration (GitOps style approach) to track all the Hudi and Spark configurations, it isnot sufficient.
Moreover, mapping the schema of the table all the way through the above steps is prone to errors. Schema mismatch errors in Apache Hudi logs need some digging to identify.I needed a tool that was built to discover, manage and map data-sources and schemas onto Hudi table configurations.

What was I thinking of doing?

The first course of action was to implement code that automates table creation steps and infrastructure provisioning customized to our environment and orchestrated by Airflow.

But, having a deeper understanding of Hudi table configurations and the setup process, I ideated a 'table-service' that simplifies the effort and creates a self-service data-lake platform. Further, I wanted to capture my knowledge in an easily consumable platform for external teams as well in the form of sane default configurations.

I have a go-to framework/platform that I prefer using for any stateful workflow logic steps and automation. It is Temporal. It is a solid piece of technology every developer should know and learn.

I have been using it since it's initial open-sourcing as Cadence Workflow. I also worked on its adoption at a large telecommunications provider in 2020. I led the building and implementation of the infrastructure and Kubernetes provisioning in the private cloud using Temporal. Learn more about its Use cases here.

I recommend Temporal to all development teams because it does away with the need for specialized knowledge about using a workflow system or its DSL.
We can author complex long-running stateful logic in our favorite programming language through it's programming model. The productivity it brings is palpable. You just need to develop independent microservices and Temporal handles the distributed state, queues, and resiliency management.Note, it is NOT for #Nocode oriented teams.

Coming back to Airbyte Hudi, after partially implementing the table-service as my side project, I stumbled upon the perfect platform for the problem I was solving.

It is based on Temporal and implemented in Java (in which I had earlier development experience).

It's Airbyte.

I instantly knew that it will be good (I am biased as it is built on top of Temporal).
So, I took a break from my love for Go and started diving into Airbyte's open-source codebase on GitHub.

Airbyte is a data integration platform. It surely lives up to what it claims: 'Data integration made simple, secure and extensible.'
Airbyte's user interface is pretty slick. We can create connections between different sources and destinations and sync them by its scheduler (backed by temporal).

Airbyte is an innovative ELT platform with a vibrant community.

What I ended up doing!

I started looking into Airbyte documentation, understanding how to extend it by connector SDKs.

I faced issues running it on my M1 Mac Mini. So, I helped resolve them first.

Then, I started developing Apache Hudi destination using Java SDK. But quickly abandoned the idea. I will go into more detail on this later.

Then I looked at how the 'T' part of the platform is implemented— the 'Transformation' in ELT.

Airbyte leverages Dbt to transform the data in the destination after the sync from the source.

Currently, transformations cannot be scheduled on their own. They execute after the sync.

Apache Hudi had Dbt integration in development at that time. It is now available in the 0.10.0 release. So, I thought custom transformation for Hudi will be the way to go.

What I understood is that the dbt-spark hudi package makes it easy to operate (query) Hudi tables through Dbt models.
But, I am trying to solve the problem of creating Hudi tables easily while leveraging all the Hudi features.

The more time I spent with both Hudi and Airbyte, the more I realized how the platforms are similar. Or at-least need similar sets of features. Combining them can be powerful.

It is true that Airbyte is more general, Hudi is very specific (single destination of data lake). But if we consider Hudi as a subset of the ELT platform, most of the features overlap.

I have detailed the parallels between Airbyte and Apache Hudi Replication below.
So, I started implementing Hudi as a generalization of the Airbyte replication protocol. The data path will be handled by Hudi.

It is currently not supported to extend Airbyte in the way I did, nor is it in the near roadmap from what I understand. Based on the feedback and interest from the maintainers and community, I will propose this formally to the Airbyte team.

But why ?

We need to understand the Airbyte model first. It implements an improved design based on data integration connectors as containers and a protocol for schema discovery and piping data between connectors.
As it is container based, connector code can be implemented in any language and Airbyte orchestrates the connection in a cloud-native way (Read K8s).
The data is piped from source to destination as Airbyte messages. But Apache Hudi is based on distributed execution engines like Apache Spark and Apache Flink. It handles its own data sources and destination (lake storage).

Scaling Airbyte replication to big data bulk loads is not always possible, at least not yet.

There are multiple ways in which Airbyte and Hudi can be integrated.
I am sure there are proposals in the works. Hudi Roadmap has Airbyte RFC incoming.

1) Hudi as custom transformation in Airbyte, and further extended by Dbt-spark package
2) Airbyte destination as Hudi supported source (S3, Kafka), Hudi sync orchestrated by different scheduler (Airflow, Cron, Step functions etc.)3) Airbyte source in Apache Hudi itself similar to Kafka source4) Finally, Hudi as extension of the Airbyte replication protocol and orchestrated as any Airbyte replication

I explored the above-listed way of integration, mainly because it makes Airbyte more extensible not only for Apache Hudi, but for other use-cases as well.
And it is strategic to try integrating 'the new open-source standard to sync data' with 'a rich platform to build streaming data lakes with incremental data pipelines on a self-managing database layer, while being optimized for lake engines and regular batch processing'.

Hudi can be run in continuous mode, running its services inline.
This mode is not considered for now. Similarly, Airbyte does not have support for streaming currently.

Technical Details

High level design outline

- Minor changes to platform protocol specifications are needed. Connector specification will have 'supportedSources' and 'supportedDestinations' fields.- A new connector concept called 'Replication launcher' will be added in Airbyte.- It must be possible to develop replication launcher in any language similar to sources and destinations.- If a replication launcher configuration is added to the Airbyte connection, Airbyte service invokes the corresponding container image.- Inputs to the replication will be existing sync input and new connector-specific replication configurations.- Based on the replication configuration, the job will invoke child workflows routed to the corresponding Temporal task queues.- Replication launcher is implemented as a library but will be hosted in the new Airbyte workers deployed as separate independent services.- The replication launcher will be provided with a subset of worker environment, job root, and process factory.- Replication logic can be in-process or out-of-process. Implementations that call out cloud providers can run in-process.- Replication logic will implement its own data-shoveling from sources to destinations, but still emit Airbyte Protocol state messages to be tracked like Airbyte replication.- Replication launcher will be orchestrated by modified replication workers called AirbyteDelegatedReplicationWorker.- This is similar in spirit to the latest additions of out-of-worker container orchestrator.- To allow MIT-licensed replication connector implementations, few configuration classes will need to be moved out of platform modules, which fall under ELv2.

Pros & Cons

Pros:

- By allowing a different replication implementation within Airbyte, you can serve all the features of connector configuration, schema discovery, scheduling, logging, secret management and awesome UI as platform for Apache Hudi.
- Airbyte can become the single standard platform entry-point to other data integration services through extensions/adopters (AWS DMS, GCP Datastream, etc.) — something like https://github.com/virtual-kubelet/virtual-kubelet- More use cases unlocked. Example: Orchestrating using Airbyte, use Amazon Redshift UNLOAD as a replication and sync huge datasets into S3.

Cons:

- It deviates from current Airbyte roadmaps and efforts
- Adds more concepts, some of them may not be having direct mappings. - UI changes needed.- More maintenance and support.

Implementation

I implemented the above design by creating a minimal interface needed to generalize the replication. Here are the non-exhaustive code snippets conveying the main idea.

Next steps

You can watch the demo implementation here.I would love to receive feedback from the community and maintainers.

Although all I did is map Airbyte configuration to Apache Hudi configuration, this has become an elaborate blog. :)
I hope you found it helpful!

The world’s fastest cloud data warehouse:

When designing analytics experiences which are consumed by customers in production, even the smallest delays in query response times become critical. Learn how to achieve sub-second performance over TBs of data with Firebolt.

--

--

Published in SelectFrom

A vocal community of enthusiastic developers. We speak all things data, code and engineering.

Responses (2)

Write a response