Big Data Analytics · PySpark
Translating Batch Processing to
Stream Processing
An AST-based converter that automatically rewrites a Spark batch job into an equivalent Spark structured streaming job — no manual rewrite required.
01 Overview
This project demonstrates how to convert traditional batch processing code into stream processing code automatically, using a custom AST-based converter. It shows both modes running locally, side by side, against the same transformation logic.
Batch Mode
Reads a static CSV file once with Spark RDDs, filters and aggregates it, then writes the result to disk.
Stream Mode
Watches a directory for new files and applies the same filter & aggregation logic continuously as data arrives.
AST Converter
Parses batch.py into a Python AST, transforms specific nodes, and regenerates a working stream.py.
02 Architecture
The same pipeline runs in two modes, joined by one automated translation step.
Batch runs the path once and stops. Stream keeps the same path open — every new file in stream_input/ flows through it continuously.
What the converter actually rewrites
- Inserts a
StreamingContextimport and createssscright after the Spark context. - Rewrites
sc.textFile(...)→ssc.textFileStream("stream_input/"), retargeting the result tolines. - Wraps the
map · filter · map · reduceByKeychain insidelines.foreachRDD(...). - Replaces
saveAsTextFile(...)withssc.start()+ssc.awaitTermination().
03 End-to-End Workflow
From an empty terminal to a live streaming job — the same sequence regardless of which path you're running.
- Clone repo
- Install deps
- Run batch.py
- AST convert
- Run stream.py
- Feed stream_input/
- Live console output
04 Batch vs. Stream Code
The exact same business logic — filter rows where column 3 > 50, count occurrences of column 2 — expressed two ways.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ExampleRDD").getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("batch_input/data.csv")
rdd_filtered = rdd.map(lambda line: line.split(",")) \
.filter(lambda fields: int(fields[2]) > 50) \
.map(lambda fields: (fields[1], 1)) \
.reduceByKey(lambda x, y: x + y)
rdd_filtered.coalesce(1).saveAsTextFile("batch_output/output.csv")
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ExampleRDD').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 5)
lines = ssc.textFileStream('stream_input/')
lines.foreachRDD(lambda rdd: rdd.map(lambda line: line.split(','))
.filter(lambda fields: int(fields[2]) > 50)
.map(lambda fields: (fields[1], 1))
.reduceByKey(lambda x, y: x + y)
.foreach(lambda result: print(result)))
ssc.start()
ssc.awaitTermination()
05 Getting Started
Everything runs locally. No cluster, no cloud account.
-
Clone the repository
git clone https://github.com/SRIKANTH284/Data_Engineering_project.git -
Open the project
Open the cloned folder in VS Code (or your preferred editor).
-
Clean previous output
Delete
batch_output/(it's recreated bybatch.py) and deletestream.py(it's regenerated by the converter). -
Enter the project directory
cd Data_Engineering_project -
Install dependencies
pip3 install -r requirements.txt -
Run the batch job
python3 batch.pyCreates
batch_output/with the batch-processed result ofdata.csv. -
Generate the streaming version
python3 ast_converter.py batch.pyParses
batch.pyand writes a newstream.py. -
Run the streaming job
python3 stream.py -
Simulate streaming data
In a second terminal, drop CSVs into
stream_input/one at a time:cp data1.csv stream_input/data1.csv cp data2.csv stream_input/data2.csvEach file is picked up, filtered, and aggregated — results print to the console batch-by-batch.
06 Project Structure
How the repository's files relate to each other — what reads what, and what gets generated for you.
Or as a plain tree:
Data_Engineering_project/
├── batch_input/
│ └── data.csv
├── batch_output/ (auto-generated by batch.py)
├── stream_input/ (drop data1.csv / data2.csv here to simulate streaming)
├── batch.py (Spark RDD batch job)
├── ast_converter.py (AST transformer: batch.py -> stream.py)
├── stream.py (auto-generated streaming job)
├── data1.csv
├── data2.csv
├── requirements.txt
└── README.md
07 Tech Stack
Python 3
Core language for all scripts and the AST tooling.
PySpark
Spark RDD API for batch, Spark Streaming for the continuous job.
Python ast
Parses batch.py into a syntax tree for inspection and rewriting.
astor
Converts the transformed AST back into runnable Python source.
argparse
Command-line interface for pointing the converter at a batch file.
py4j
Bridges the PySpark API to the underlying JVM Spark engine.
Conclusion
This project shows how batch processing can be transformed into stream processing using Python automation and Spark Structured Streaming — easy to run locally, and a clear way to learn how big data pipelines move from one execution model to the other.