Scalable data pipelines from dagster with pyspark
Welcome 😄 to this blog series about the modern data stack ecosystem.
post series overview
This is part 5 of the modern data stack blog post series. Here, the integration of pyspark into data pipelines is shown.
The other posts in this series:
- overview about this whole series including code and dependency setup
- basic introduction (from hello-world to simple pipelines)
- assets: turning the data pipeline inside out using software-defined-assets to focus on the things we care about: the curated assets of data and not intermediate transformations
- a more fully-fledged example integrating multiple components including resources, an API as well as DBT
- integrating jupyter notebooks into the data pipeline using dagstermill
- working on scalable data pipelines with pyspark
- ingesting data from foreign sources using Airbyte connectors
- SFTP sensor reacting to new files
scalable data processing - don`t!
- the first rule of distributed systems is not to use distributed systems
- the second rule is to use distributed systems if absolutely neccessary to accomplish the desired use case
The big data and scalable data processing ecosystem has flourished in the last years. For some people, data larger than an Excel spread-sheet is considered large - for others, only data in the size of multiple hundreds of gigabytes, terabytes, or perhaps petabytes is deemed big.
Indeed, the tool used to process the individual notion of big data needs to adapt to the vastly different requirements, which drastically change once the data outgrows the limitations of one machine. Nonetheless in particular, some python native data processing tools need to be named here withregards to speed and scalability beyond the standard pandas dataframes:
- Dask: provides advanced parallelism for analytics, enabling performance at scale for the tools you love
- RAPIDSAI CUdf
- Polars - a lightning-fast DataFrame library for Rust and Python
Always consider the quote:
Big RAM is eating big data
The distributed data ecosystem was born years ago when the ratio of the datasets and available RAM was skewed to the costly side. This has changed today and RAM - even in the size of Terabytes is very cheap. Furthermore, accelerators such as GPUs have become affordable even in regions of 10s of GBs of RAM.
This development changes the landscape of the scalable data processing ecosystem. Choose the tools wisely, as a very scalable distributed tool like Spark can be complex to fine-tune (and thus in particular in use-cases on a single machine much much slower than dedicated and modern solutions for these use-cases).
Apache spark - notably pySpark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters
Nonetheless, distributed processing is necessary for some gigantic datasets to be able to perform the desired computations. Even in such a scenario, Dagster can be helpful to orchestrate such large-scale computations and their integration in your overall data-processing landscape.
dagster and Spark
Dagster can interface in two modes with Spark:
- External: The
create_shell_command_op
can be used to create an operation that invokes spark-submit. Metadata output is not automatically. Manual materialization events should be added to make the most of the regular dagster features. However, this also eases the gradual adoption of dagster in case of JVM-based spark jobs. - Internal: A regular dagster
op
/operation can submit a job or parts of a spark job
In the latter case you need to understand the concept of IO managers and their implicatio nfor Spark jobs.
Assuming you are processing a huge dataset, a distributed system like Spark is using a couple of techniques to speed up the processing:
- partition push-down: Read-only the required (time) buckets of data which are absolutely necessary
- projection: read only the columns which are really required for any particular job
- lazy computation & reordering of tasks: Perform cost-based optimization of the job to optimize joins and perhaps further push down specific tasks as close to the source as possible
All three do not work well with any regular IO manager in dagster: Typically, one wants to materialize intermediate steps in dagster to a persistent storage volume to allow jobs to continue operation if one op fails (and speed up debugging). But by this, all three optimizations are broken as the whole computation graph cannot be optimized by Spark. However, dagster allows choosing the IO manager for any specific operation. When choosing an in-memory IO manager, the reference to the non-materialized large dataframe can be forwarded from one operation to the next one. Only the final operation will use a different (actually materializing) IO manager. By following this pattern, you can profit from both worlds:
- Spark: automatic optimization of the compute graph for very large-scale computations
- Dagster: resource handling, easy testability by separating business logic from the underlying compute
@op(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
return context.resources.pyspark.spark_session.createDataFrame(rows, schema)
@op(required_resource_keys={"pyspark_step_launcher"},out=Out(io_manager_key="io_manager_materialize_final"))
def filter_over_50(people: DataFrame) -> DataFrame:
return people.filter(people["age"] > 50)
The E2E job needs to read the required resources from the configuration. Here, a local spark instance is created to simplify the demonstration.
from dagster import mem_io_manager, in_process_executor
local_resource_defs = {
"pyspark_step_launcher": no_step_launcher,
"pyspark": pyspark_resource.configured({"spark_conf": {"spark.default.parallelism": 1}}),
"io_manager_materialize_final": parquet_io_manager.configured({"path_prefix": "warehouse_location"}),
"io_manager": mem_io_manager
}
@graph
def make_and_filter_data():
pp = make_people()
filter_over_50(pp)
filter_over_60(pp)
pyspark_sample_job_local = make_and_filter_data.to_job(
name="pyspark_sample_job_local", resource_defs=local_resource_defs,
executor_def=in_process_executor
)
When instantiating the pyspark_sample_job_local
job you can visit: http://localhost:4040 and explore sparks UI (while the job is waiting for the 50 seconds to finish its sleep timer).
Further details are found here: https://docs.dagster.io/integrations/spark#using-dagster-with-spark and in https://github.com/dagster-io/dagster/discussions/6899.
summary
Use a truly distributed computation engine only if it is required. Otherwise, there are better and simpler options. In case an actual distributed engine like Spark is required: Try to profit from the usability & testability improvements of dagster by following this pattern:
- Spark: automatic optimization of the compute graph for very large-scale computations
- Dagster: resource handling, easy testability by separating business logic from the underlying compute