Spark Structured Streaming Performance — Part 1

Saiyantan Ghosh
4 min readJan 3, 2021

--

Benchmarking of spark Structure streaming application performance is very important. Things are getting worse or unexpected during peak load. If you think spark structured streaming query as pipeline - there is source and sink and in between you have transformations. Lets consider we have Kafka as source and sink as Cassandra.

Apart from peak load one of important factor is — if you have any dependent spark streaming query which will do some time taking processing like aggregate on fixed time interval .

I have created a sample use case to depict that dependent structured streaming query. Lets say, we have realtime analytics platform which is getting order event data from kafka and generate daily report summary like total daily purchase amount etc.

Order Event data are inserting to event table from kafka and amounts are aggregated based on date to orderSummary table. cassandra table records looks like below —

order event data
daily order Summary

There are two spark structured streaming Query —

eventIngestionQuery — is the spark streaming which is taking data from kafka and insert into event table on each micro batch.

dailySumEventQuery(dependent) — is the spark streaming which is triggering on 30 seconds interval and accumulating data and perform summation of amount.

dailySumEventQuery is dependent on eventIngestionQuery and aggregate events on each 30 secords

Below sample code to make understanding of dependent query

import com.datastax.spark.connector.cql.CassandraConnector;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.DataTypes;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

public class KafkaStructuredStreaming {
private static ObjectMapper mapper = new ObjectMapper();

public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "C:\\hadoop");
SparkConf conf = new SparkConf().setAppName("Spark Structured Streaming").setMaster("local[2]");
SparkContext sparkContext = new SparkContext(conf);
SparkSession sparkSession = new SparkSession(sparkContext);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// connect to db CassandraDBConnector client = new CassandraDBConnector();
client.connect("localhost", 9042);


SparkConf cassandraConf =
sparkSession
.sparkContext()
.getConf()
.set("spark.cassandra.connection.host", "localhost")

.set("spark.cassandra.connection.port", "9042")
.set("spark.cassandra.input.consistency.level", "LOCAL_QUORUM");

CassandraConnector cassandraConnector = CassandraConnector.apply(cassandraConf);

final AggregateAmount sumAmount =
new AggregateAmount(DataTypes.StringType, DataTypes.LongType);
// insert to cassandra event table final EventInsert eventInsert = new EventInsert(cassandraConnector);// insert to cassandra orderSummary table final DailyReportInsert dailyReportInsert = new DailyReportInsert(cassandraConnector);// define kafka source Dataset<Row> sourceDf = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "latest")
.option("subscribe", "Event")
.load()
.selectExpr("CAST(value AS STRING)");
// covert to Event

final Dataset<Event> eventDs = sourceDf.as(Encoders.BINARY())
.map((MapFunction<byte[], Event>) (byte[] x) -> {
try {
final Event event = mapper.readValue(x, Event.class);
java.sql.Timestamp ts = java.sql.Timestamp.from(event.getOccurAt().toInstant());
LocalDate justDate = ts.toLocalDateTime().toLocalDate();
event.setOccurDate(justDate.format(DateTimeFormatter.ISO_LOCAL_DATE));
return event;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}, Encoders.bean(Event.class))
.filter((FilterFunction<Event>) x -> x != null);
// covert to dailySumEvent final Dataset<Row> dailySumEvent = eventDs.groupBy("category", "type", "occurDate").agg(sumAmount
.apply(functions.col("data"))
.as("aggData"));
// eventIngestionQuery StreamingQuery eventIngestionQuery =
eventDs
.writeStream()
.format("console")
.option("truncate", false)
.option("checkpointLocation", "file:/checkpoint/eventIngestionQuery/")
.outputMode(OutputMode.Append())
.foreach(eventInsert)
.queryName("eventIngestionQuery")
.start();

// dailySumEventQuery
StreamingQuery dailySumEventQuery =
dailySumEvent
.writeStream()
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("30 seconds"))
.option("checkpointLocation", "file:/checkpoint/dailySumEventQuery/")
.outputMode(OutputMode.Update())
.foreach(dailyReportInsert)
.queryName("dailySumEventQuery")
.start();

try {
eventIngestionQuery.awaitTermination();
dailySumEventQuery.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}

}

CREATE TABLE eventdb.event (
category text,
type text,
occurat timestamp,
id text,
data map<text, bigint>,
isrt_ts timestamp,
PRIMARY KEY ((category, type, occurat), id)
) WITH CLUSTERING ORDER BY (id ASC);

CREATE TABLE eventdb.ordersummary (
report_dt text,
category text,
type text,
data map<text, bigint>,
isrt_ts timestamp,
PRIMARY KEY (report_dt, category, type)
) WITH CLUSTERING ORDER BY (category ASC, type ASC);

and to publish events using kafka console producer one can use below sample payload-

kafka-console-producer.bat - broker-list localhost:9092 - topic Event
{"id":"3","category":"ORDER","type":"CHECKOUT","occurAt":1609678572000,"data":{"AMOUNT":1500}}

So if you publish one event then the amount aggregation will trigger every 30 seconds and do summation on “category”, “type”, “occurDate”. Now if you have situation where aggregated query batch is taking more time to complete then it will effect performance or execution time of other queries and will cause slowness in overall processing.

Even I saw that small load like 5 hundred thousand records (within 2–3 mins) also cause slowness in spark structured streaming processing.

So it is very much needed to capture performance related metrics which is given by spark system . This will provide very clear idea about the nature of load(whether linear or exponential) and quick insight of each spark structured streamer query performance.

Next I will discussion on how we can capture metrics(offline method) and details about those metrics and some decisions.

Spark on your mind!!!

--

--

Saiyantan Ghosh

Passion to solve Distributed System problem by Architecture Principles and its Implementation