Publish AI, ML & data-science insights to a global community of data professionals.

Running Google’s Cloud Data Fusion batch pipelines at “scale”

TLDR: When submitting batch Cloud Data Fusion pipelines at scale via REST api, pause for a few seconds between each call to allow CDF to…

A typical CDF pipeline to move data from legacy sqlserver to BigQuery. Image by Author
A typical CDF pipeline to move data from legacy sqlserver to BigQuery. Image by Author

Making Sense of Big Data

Testing Cloud Data Fusion for data science workflows

TLDR: When submitting batch Cloud Data Fusion (CDF) pipelines at scale via REST api, pause for a few seconds between each call to allow CDF to catch up.

Background: as part of a migration we’re invovled in, our data science team is migrating hundreds of legacy MS Sqlserver ODS tables into BigQuery. While our engineering team is handling the actual migration, we (DS team) want the data and control of the data ourselves to build,prototype, migrate our models in GCP quickly without waiting for all of the quality, and wide scope requirements that our engineering team is tasked with. Enter Google’s Cloud Data Fusion. Based on CDAP, it’s a great solution to our teams issues: We want to copy legacy tables into BigQuery on a regular cadence to serve our models. We, the data science team, have full control over scheduling, scope, and the CDF "no code" interface makes it straightforward for all team members to leverage the power of spark. It’s easy to add an additional pipeline by following a few simple standards.

The Problem: at the onset of this migration, we identified all tables and sources we’d be copying to BigQuery – basically the scope of the migration. We had about 100 tables, and we built 1 pipeline to migrate 1 table, so ~100 pipelines in total. The pipeline run cadence varied between daily and monthly, with most tables somewhere in between. Things worked fine for awhile, until we built a model that required 60 tables virtually all loaded at once. I won’t address why we’re building a model that requires 60 tables, but rather emphasize how this requirement makes for an interesting test of the scalablility of Cloud Data Fusion.

At this point, we can all agree that CDF is not the best tool to execute 60 pipelines simultaneously, each migrating a single table, where many of which finish in 10s of minutes, but this is the situation we found ourselves in. CDF is not well-suited to this task for a lot of reasons, spinning up 60 small MR/spark clusters for mostly 10 minute migration tasks being just one. In reality, the provisioning time for these MR/spark clusters (dataproc in GCP, EMR in AWS) is only 2–3 minutes, and the cost actually isn’t much at all. We started by building a list of pipelines, looping through that list and executing them in a bash script via CDAP’s REST api from a GCE VM. While we eventually scheduled these asynchronously, we wanted to prove that we could execute things ~simultaneously and that CDF could scale up to our needs.

What we found initially disappointed us. Our script executed the start-pipeline commands via REST api all within a second or so, and of the 60 pipelines, 25 failed with almost no logs. As we parsed through the CDF’s App Fabric Service logs, we encountered an interesting error:

2020–10–30 22:27:34,715 – WARN [pool-10-thread-1:i.c.c.i.a.s.RunRecordCorrectorService@148] – Fixed 25 RunRecords with status in [STARTING, RUNNING, SUSPENDED], but the programs are not actually running

2020–10–30 22:27:34,716 – INFO [pool-10-thread-1:i.c.c.i.a.s.RunRecordCorrectorService@103] – Corrected 25 run records with status in [STARTING, RUNNING, SUSPENDED] that have no actual running program. Such programs likely have crashed or were killed by external signal

A dataproc cluster which hosted one of our failed pipelines, the clusters of successful vs failed pipelines are ~identical log-wise. Image by Author
A dataproc cluster which hosted one of our failed pipelines, the clusters of successful vs failed pipelines are ~identical log-wise. Image by Author
num YARN containers vs time. Successful creation of dataproc clusters is visible after 6pm. Image by Author
num YARN containers vs time. Successful creation of dataproc clusters is visible after 6pm. Image by Author

While CDF had received 60 "execute pipeline" api calls, it missed 25 run records. What’s interesting though, is that when we looked at the dataproc logs, we found that it successfully created 60 clusters. That is to say, we observed successfully-created clusters, even ones that hosted the failed pipelines- this helped us realize that we didn’t have scalabillity issues with dataproc cluster creation, but rather with the CDF "master". The problem is with CDF’s bookkeeping/tracking NOT with dataproc cluster creation. Being data sciencists and not engineers or architects, we are limited in knowledge of how this works, but we assumed we overloaded whatever tracking db is used when we flooded the system with 60 api calls in ~1second. We tried again by adding some padding between each api call. We found that waiting a probably-generous 10 seconds results in all 60 pipeline execute calls being submitted and executed successfully. While not the most elegant solution, it gets the job done for us so we can think about other things, which counts for a lot in data science.


Update: we have since explored and are executing and scheduling these pipelines with Airflow on our engineering team’s Cloud Composer instance, but the approach outlined above remains a solid option when expensive and complicated orchestration resources are unavailable.


Towards Data Science is a community publication. Submit your insights to reach our global audience and earn through the TDS Author Payment Program.

Write for TDS

Related Articles