Big Data Analytics · PySpark

Translating Batch Processing to
Stream Processing

An AST-based converter that automatically rewrites a Spark batch job into an equivalent Spark structured streaming job — no manual rewrite required.

Python 3 PySpark Python AST astor Runs Locally

01 Overview

This project demonstrates how to convert traditional batch processing code into stream processing code automatically, using a custom AST-based converter. It shows both modes running locally, side by side, against the same transformation logic.

Batch Mode

Reads a static CSV file once with Spark RDDs, filters and aggregates it, then writes the result to disk.

Stream Mode

Watches a directory for new files and applies the same filter & aggregation logic continuously as data arrives.

AST Converter

Parses batch.py into a Python AST, transforms specific nodes, and regenerates a working stream.py.

02 Architecture

The same pipeline runs in two modes, joined by one automated translation step.

Batch Path · runs once
batch_input/data.csv
batch.pySpark RDD · map / filter / reduceByKey
batch_output/output.csv
ast_converter.py parses batch.py → rewrites AST nodes → emits stream.py
Stream Path live
stream_input/*.csv
stream.pySpark Streaming · textFileStream · foreachRDD
console output
waiting for stream_input/...

Batch runs the path once and stops. Stream keeps the same path open — every new file in stream_input/ flows through it continuously.

What the converter actually rewrites

  • Inserts a StreamingContext import and creates ssc right after the Spark context.
  • Rewrites sc.textFile(...)ssc.textFileStream("stream_input/"), retargeting the result to lines.
  • Wraps the map · filter · map · reduceByKey chain inside lines.foreachRDD(...).
  • Replaces saveAsTextFile(...) with ssc.start() + ssc.awaitTermination().

03 End-to-End Workflow

From an empty terminal to a live streaming job — the same sequence regardless of which path you're running.

  1. Clone repo
  2. Install deps
  3. Run batch.py
  4. AST convert
  5. Run stream.py
  6. Feed stream_input/
  7. Live console output

04 Batch vs. Stream Code

The exact same business logic — filter rows where column 3 > 50, count occurrences of column 2 — expressed two ways.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ExampleRDD").getOrCreate()
sc = spark.sparkContext

rdd = sc.textFile("batch_input/data.csv")

rdd_filtered = rdd.map(lambda line: line.split(",")) \
    .filter(lambda fields: int(fields[2]) > 50) \
    .map(lambda fields: (fields[1], 1)) \
    .reduceByKey(lambda x, y: x + y)

rdd_filtered.coalesce(1).saveAsTextFile("batch_output/output.csv")
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ExampleRDD').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 5)

lines = ssc.textFileStream('stream_input/')

lines.foreachRDD(lambda rdd: rdd.map(lambda line: line.split(','))
    .filter(lambda fields: int(fields[2]) > 50)
    .map(lambda fields: (fields[1], 1))
    .reduceByKey(lambda x, y: x + y)
    .foreach(lambda result: print(result)))

ssc.start()
ssc.awaitTermination()

05 Getting Started

Everything runs locally. No cluster, no cloud account.

  1. Clone the repository

    git clone https://github.com/SRIKANTH284/Data_Engineering_project.git
  2. Open the project

    Open the cloned folder in VS Code (or your preferred editor).

  3. Clean previous output

    Delete batch_output/ (it's recreated by batch.py) and delete stream.py (it's regenerated by the converter).

  4. Enter the project directory

    cd Data_Engineering_project
  5. Install dependencies

    pip3 install -r requirements.txt
  6. Run the batch job

    python3 batch.py

    Creates batch_output/ with the batch-processed result of data.csv.

  7. Generate the streaming version

    python3 ast_converter.py batch.py

    Parses batch.py and writes a new stream.py.

  8. Run the streaming job

    python3 stream.py
  9. Simulate streaming data

    In a second terminal, drop CSVs into stream_input/ one at a time:

    cp data1.csv stream_input/data1.csv
    cp data2.csv stream_input/data2.csv

    Each file is picked up, filtered, and aggregated — results print to the console batch-by-batch.

Note: Streaming output may vary slightly per micro-batch since each file is processed on its own. Once every file has been consumed, the cumulative totals match the batch result.

06 Project Structure

How the repository's files relate to each other — what reads what, and what gets generated for you.

batch_input/
data.csv read by batch.py
batch_output/ auto-generated by batch.py
stream_input/ drop data1.csv / data2.csv here
batch.py read by ast_converter.py
ast_converter.py batch.py → stream.py
stream.py auto-generated by ast_converter.py
data1.csv copied into stream_input/
data2.csv copied into stream_input/
requirements.txt pyspark, astor, ...

Or as a plain tree:

Data_Engineering_project/
├── batch_input/
│   └── data.csv
├── batch_output/        (auto-generated by batch.py)
├── stream_input/         (drop data1.csv / data2.csv here to simulate streaming)
├── batch.py              (Spark RDD batch job)
├── ast_converter.py       (AST transformer: batch.py -> stream.py)
├── stream.py              (auto-generated streaming job)
├── data1.csv
├── data2.csv
├── requirements.txt
└── README.md

07 Tech Stack

Python 3

Core language for all scripts and the AST tooling.

PySpark

Spark RDD API for batch, Spark Streaming for the continuous job.

Python ast

Parses batch.py into a syntax tree for inspection and rewriting.

astor

Converts the transformed AST back into runnable Python source.

argparse

Command-line interface for pointing the converter at a batch file.

py4j

Bridges the PySpark API to the underlying JVM Spark engine.

Conclusion

This project shows how batch processing can be transformed into stream processing using Python automation and Spark Structured Streaming — easy to run locally, and a clear way to learn how big data pipelines move from one execution model to the other.

Authors

Srikanth Badavath

Virginia Tech · Blacksburg, VA

Mokshitha Mandadi

Virginia Tech · Blacksburg, VA

Neelesh Samptur

Virginia Tech · Blacksburg, VA