Working with Spark & Flink

From my experience working at Simpl.

In this article, I write my some details and understandings around Spark and Flink.

Spark

Basic Architecture:

Basic spark architecture (Image source: ref)

Apache Spark is an open-source distributed computing system designed for big data processing and analytics. Spark is known for its speed and efficiency. Spark enables applications to run faster by utilising in-memory cluster computing.

The Apache Spark framework uses a master-slave architecture that consists of a driver, which runs as a master node, and many executors that run across as worker nodes in the cluster. Apache Spark can be used for batch processing and real-time processing as well.

Spark Driver: It is the master node of the Spark application. It is responsible for:

Executors: They are worker nodes that execute tasks assigned by the driver. Each Spark application has its own executors, which are responsible for:

Related snapshots from a real system:

Configs of cluster: Databricks Spark cluster configs

Appearance of jobs when running a spark code: spark-jobs-1 spark-jobs-2

Stages of a Spark job:
spark-job-stages

Worker information when running a code:
spark-executors

Cluster Manager: It is responsible for managing resources across the cluster. It allocates resources to different applications and manages their lifecycle. Common cluster managers used with Spark include:

Data Abstractions:

Partitions: A partition is a logical chunk of data that Spark distributes across the cluster. Each partition is processed by a single task, allowing for parallel execution. When Spark reads data from a source (e.g., HDFS), it creates partitions based on the input size and configuration settings. By default, Spark creates one partition for each HDFS block (e.g., typically 128MB).

Execution Flow:

from pyspark import SparkContext
# Initialize Spark Context
sc = SparkContext("local", "Word Count")
# Read text file into an RDD
lines = sc.textFile("hdfs://path/to/input.txt")
# Check initial number of partitions
print(f"Initial number of partitions: {lines.getNumPartitions()}")
# Filter lines containing 'keyword'
filtered_lines = lines.filter(lambda line: 'keyword' in line)
# Count filtered lines
count = filtered_lines.count()
# Print result
print(f"Number of lines containing 'keyword': {count}")
# Repartitioning example
repartitioned_lines = filtered_lines.repartition(10)
print(f"Number of partitions after repartitioning: {repartitioned_lines.getNumPartitions()}")
# Stop the Spark Context
sc.stop()

Step-by-Step Execution
  Driver Initialization: The SparkContext is created, initializing the driver program.
  RDD Creation: The textFile() method creates an RDD from the input text file, automatically partitioning it based on HDFS blocks (e.g., if the file is large enough).
  Initial Partition Check: The initial number of partitions is printed using getNumPartitions().
  Transformation: The filter() transformation creates a new RDD containing only lines with 'keyword'.
  Action Execution: The count() action triggers execution across the cluster.
  Task Scheduling: The driver divides this job into stages based on transformations, creating tasks for each partition of data.
  Task Execution on Executors: Executors run these tasks in parallel threads, processing their assigned data partitions.
  Repartitioning Example: The repartition(10) method reshuffles the filtered RDD into 10 partitions to demonstrate how you can control partitioning dynamically.

Hardware Level Configuration:

Basic Architecture (More snapshots of actual pipelines below):

flink-arch (Image source: ref)

Job Manager: It is the master component responsible for coordinating the execution of jobs. It handles job submission, manages task scheduling and distribution across Task Managers, coordinates checkpoints and recovery in case of failures, and resource management.

// Stream of events
SingleOutputStreamOperator<BaseEventA> eventAMappedStream = StreamA
        .process(new eventAMapper())
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.<BaseEventA>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withIdleness(Duration.ofMinutes(2))
                        .withTimestampAssigner((SerializableTimestampAssigner<BaseEventA>) (event, l) -> {
                            Long timeStamp = null;
                            timeStamp = OffsetDateTime.parse(event.getEventTimestamp()).toInstant().toEpochMilli(); // Watermarking based on event timestamp
                            return timeStamp;
                        })
        )
        .uid("EventA-Stream")
        .name("EventA-Stream");
        
SingleOutputStreamOperator<BaseEventB> eventBMappedStream = streamB
        .process(new eventBMapper())
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.<BaseEventB>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withIdleness(Duration.ofMinutes(2))
                        .withTimestampAssigner((SerializableTimestampAssigner<BaseEventB>) (event, l) -> {
                            Long timeStamp = null;
                            timeStamp = OffsetDateTime.parse(event.getEventTimestamp()).toInstant().toEpochMilli();
                            return timeStamp;
                        })
        )
        .uid("EventB-Stream")
        .name("EventB-Stream");
        
// Join the stream A and stream B events based on say, token attribute present in both events
SingleOutputStreamOperator<ProcessedEvent> joinedStream = eventAMappedStream.keyBy(BaseEventA::getTokenA)
        .intervalJoin(
                eventBMappedStream.keyBy(BaseEventB::getTokenB)
        )
        .between(Time.seconds(-60), Time.seconds(60))
        .process(new ProcessedEventMapper())
        .uid("Processed-Join-Stream-Event")
        .name("Processed-Join-Stream-Event");