Mastering the Integration of Spark Structured Streaming with the Confluent Schema Registry in Kafka
In this guide, we will look at how to connect Spark Structured Streaming with the Confluent Schema Registry. This is a strong mix that helps us handle real-time data better in Apache Kafka. We will talk about important parts like setting up the environment, configuring Spark for structured streaming, and managing data using Avro schemas. When we understand how to use the Confluent Schema Registry well, we can create strong streaming apps that adjust to changes in the schema over time. This guide is for data engineers, software developers, and architects. It gives us the knowledge and tools to make this integration work well.
In this chapter, we will discuss the following solutions:
- Part 1 - Setting Up the Environment: We will learn how to get our system ready for Spark Structured Streaming and Kafka integration.
- Part 2 - Configuring Spark for Structured Streaming: We will see the needed settings to make Spark ready for structured streaming apps.
- Part 3 - Connecting to the Confluent Schema Registry: We will find out how to connect our Spark app to the Confluent Schema Registry.
- Part 4 - Defining Avro Schemas for Data Serialization: We will look at how to define Avro schemas to serialize data well.
- Part 5 - Building a Spark Streaming Application with Schema Registry Integration: We will go through the steps to create a Spark streaming app that uses the Schema Registry.
- Part 6 - Handling Schema Evolution in Streaming Applications: We will learn how to manage and apply schema evolution in our streaming app.
By following this guide, we will be ready to connect Spark Structured Streaming with the Confluent Schema Registry. This will make our Kafka streaming apps better. For more tips on Kafka integration, check our resources on Kafka with Confluent and Kafka with Spark.
Part 1 - Setting Up the Environment
To connect Spark Structured Streaming with the Confluent Schema Registry, we need to set up the environment. This means we will install and configure Apache Spark, Apache Kafka, and the Confluent Schema Registry.
Prerequisites
Java Development Kit (JDK): We need JDK 8 or newer.
We can check if Java is installed by running:
java -version
Apache Spark: We need to download and install Spark.
- Go to the Apache Spark download page and pick a pre-built package for Hadoop.
- Extract the downloaded file:
tar -xzf spark-3.2.1-bin-hadoop3.2.tgz cd spark-3.2.1-bin-hadoop3.2
Apache Kafka: We also need to download and install Kafka.
- Download Kafka from the Apache Kafka website.
- Extract the Kafka package:
tar -xzf kafka_2.12-2.8.0.tgz cd kafka_2.12-2.8.0
Confluent Schema Registry: To use the Schema Registry, we can either download it with the Confluent Platform or run it using Docker.
- To run with Docker, we can use this command:
docker run -d --name schema-registry \ -p 8081:8081 \ -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=<kafka-broker>:9092 \ -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 \ confluentinc/cp-schema-registry:latest
We need to replace
<kafka-broker>
with our Kafka broker address.
Environment Configuration
Set Environment Variables: We should add these lines to our
.bashrc
or.bash_profile
file to set the environment variables for Spark and Kafka.export SPARK_HOME=/path/to/spark-3.2.1-bin-hadoop3.2 export KAFKA_HOME=/path/to/kafka_2.12-2.8.0 export PATH=$PATH:$SPARK_HOME/bin:$KAFKA_HOME/bin
After editing, we refresh our terminal:
source ~/.bashrc
Start Kafka and Zookeeper: Kafka needs Zookeeper to manage the cluster. We can start both Zookeeper and Kafka with these commands:
# Start Zookeeper $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties # Start Kafka Broker $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
Verify Kafka Installation: We can create a topic to check our Kafka installation:
$KAFKA_HOME/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
We can list the topics to make sure it was created:
$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Testing Schema Registry: We can check if the Schema Registry is running by sending a GET request:
curl http://localhost:8081/subjects
This setup will prepare our environment for connecting Spark Structured Streaming with the Confluent Schema Registry. For more details on Kafka configuration, we can look at our article on Kafka with Confluent. This will help us ensure we have all we need to continue with the integration.
Part 2 - Configuring Spark for Structured Streaming
To connect Spark Structured Streaming to the Confluent Schema Registry, we need to set up our Spark environment. This means we must install Spark with the right tools for streaming and Schema Registry. Here are the steps to configure Spark for Structured Streaming.
Step 1: Set Up Spark Environment
First, we need to have Apache Spark installed. We can download it from the official Apache Spark website. We also need Scala and Java because Spark uses these.
Step 2: Adding Dependencies
To use Spark Structured Streaming with the Confluent Schema Registry,
we must add some dependencies to our build file. If we use Maven, we add
these in our pom.xml
:
dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.0</version>
<dependency>
</dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.3.0</version>
<dependency>
</dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.4.0</version>
<dependency>
</dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
<dependency> </
Step 3: Configuration Properties
Next, we configure the Spark application to connect to the Kafka broker and the Confluent Schema Registry. We can do this by setting properties in our Spark application. Here is an example of how to configure Spark to read from Kafka:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark Structured Streaming with Schema Registry")
.config("spark.sql.streaming.checkpointLocation", "/path/to/checkpoint/dir")
.config("kafka.bootstrap.servers", "localhost:9092")
.config("schema.registry.url", "http://localhost:8081")
.getOrCreate()
Step 4: Enabling Avro Support
To enable Avro serialization and deserialization, we need to add the Avro serializer in our configurations. We can set the default for the key and value serializers in our Spark application like this:
.conf.set("key.converter", "io.confluent.connect.avro.AvroConverter")
spark.conf.set("value.converter", "io.confluent.connect.avro.AvroConverter")
spark.conf.set("value.converter.schema.registry.url", "http://localhost:8081") spark
Step 5: Running Your Spark Application
After we configure Spark, we can run our application using
spark-submit
. We must include the right jars in the
classpath. Here is an example command to submit our Spark job:
spark-submit \
--class com.example.YourSparkApp \
--master local[2] \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,io.confluent:kafka-schema-registry-client:7.4.0 \
target/your-spark-app-1.0.jar
Conclusion
By following these steps, we can have a configured Spark environment ready for Structured Streaming with the Confluent Schema Registry. For more details on Kafka integration with Spark, check this resource.
This setup helps our Spark application read from and write to Kafka topics easily while managing Avro schemas.
Part 3 - Connecting to the Confluent Schema Registry
To connect our Spark Structured Streaming apps to the Confluent Schema Registry, we must set up our Spark app to use the Schema Registry. This helps us manage Avro schemas. The Schema Registry is a central place for our schemas. It keeps our data consistent when it goes between Kafka and our Spark app.
Prerequisites
- Confluent Schema Registry: We need to make sure that the Confluent Schema Registry is running. We can follow the Kafka with Confluent guide to install and set it up.
- Spark Structured Streaming: We should have Apache Spark set up and ready to work with Kafka.
Dependencies
We need to add the following dependencies to our
build.sbt
or pom.xml
if we are using Maven for
our Spark app:
dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.0.0</version> <!-- Use the right version -->
<dependency>
</dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.2.0</version> <!-- Use the right version -->
<dependency> </
Configuration
To connect to the Schema Registry from our Spark app, we need to set these properties in our Spark configuration:
val spark = SparkSession.builder()
.appName("Spark Structured Streaming with Schema Registry")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("schema.registry.url", "http://localhost:8081") // Change the URL if needed
.config("value.converter", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.config("value.converter.schema.registry.url", "http://localhost:8081") // Schema Registry URL
.getOrCreate()
Kafka Consumer Configuration
When we set up our Kafka consumer, we should use the
KafkaAvroDeserializer
for the value deserializer. Here’s
how we can set it up in our Spark Structured Streaming app:
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") // Kafka broker address
.option("subscribe", "your_topic") // Our Kafka topic
.option("startingOffsets", "earliest") // Start from the first message
.load()
// Deserialize the Avro data with the Schema Registry
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
val avroSerde = new GenericAvroSerde()
.configure(Collections.singletonMap("schema.registry.url", "http://localhost:8081"), false)
avroSerde
val deserializedData = kafkaStream.selectExpr("CAST(value AS STRING)")
.map(row => {
val value = row.getString(0)
// Use the Avro deserializer here
.deserializer().deserialize("your_topic", value.getBytes)
avroSerde})
Error Handling
We need to handle possible errors when we connect to the Schema Registry. We can add a try-catch block around our connection code to deal with any problems that may happen.
try {
// Try to connect to the Schema Registry
val schemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:8081", 100)
} catch {
case e: Exception =>
println(s"Failed to connect to Schema Registry: ${e.getMessage}")
}
This setup helps our Spark Structured Streaming app work with the Confluent Schema Registry. It makes sure our Kafka messages are properly serialized and deserialized with the defined Avro schemas. For more info on Kafka integration, we can check out Kafka with Spark.
Part 4 - Defining Avro Schemas for Data Serialization
In this section, we will define Avro schemas. These schemas are important for data serialization. We use them when we connect Spark Structured Streaming with the Confluent Schema Registry. Avro is a well-known tool. It gives us a small and fast way to send data. This is very important for streaming applications.
Understanding Avro Schemas
Avro schemas show how the data looks when we serialize it. We write an Avro schema in JSON format. It usually has these parts:
- Type: This tells the data type (like record, enum, array).
- Name: This is the name of the schema.
- Fields: This is a list of fields in the schema. Each field has a name and type.
Example Avro Schema
Here is an example of an Avro schema for a user profile that we can send through Kafka topics:
{
"type": "record",
"name": "UserProfile",
"namespace": "com.example",
"fields": [
{
"name": "userId",
"type": "string"
},
{
"name": "firstName",
"type": "string"
},
{
"name": "lastName",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "registered",
"type": "boolean"
}
]
}
Registering Avro Schemas with Confluent Schema Registry
After we define our Avro schema, we need to register it with the Confluent Schema Registry. This helps us manage schema versions and deal with schema changes in our streaming apps.
To register the schema, we can use the Schema Registry REST API. Here
is an example using curl
:
curl -X POST \
\
http://localhost:8081/subjects/UserProfile-value/versions -H "Content-Type: application/json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"UserProfile\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"userId\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"registered\",\"type\":\"boolean\"}]}"
}'
Serializing Data with Avro in Spark
In our Spark application, we use the Avro schema to serialize data
before we send it to Kafka. Below is a sample code snippet. It shows how
to serialize a UserProfile
object using the Avro
schema:
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.io.{EncoderFactory, DatumWriter, Encoder, EncoderFactory}
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
val schemaString = """{
"type": "record",
"name": "UserProfile",
"namespace": "com.example",
"fields": [
{"name": "userId", "type": "string"},
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": "int"},
{"name": "registered", "type": "boolean"}
]
}"""
val schema: Schema = new Schema.Parser().parse(schemaString)
val record = new GenericData.Record(schema)
.put("userId", "1234")
record.put("firstName", "John")
record.put("lastName", "Doe")
record.put("email", "john.doe@example.com")
record.put("age", 30)
record.put("registered", true)
record
val producer = new KafkaProducer[String, Array[Byte]](kafkaProducerConfig)
val byteArrayOutputStream = new ByteArrayOutputStream
val datumWriter: DatumWriter[GenericData.Record] = new SpecificDatumWriter[GenericData.Record](schema)
val encoder: Encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
.write(record, encoder)
datumWriter.flush()
encoder.close()
byteArrayOutputStream
val serializedData: Array[Byte] = byteArrayOutputStream.toByteArray
val producerRecord = new ProducerRecord[String, Array[Byte]]("user-profiles", serializedData)
.send(producerRecord) producer
Conclusion
Defining Avro schemas for data serialization is an important step. We do this when we integrate Spark Structured Streaming with the Confluent Schema Registry. By following the schema definitions, we keep our data safe and compatible across different versions of our streaming apps. For more details on setting up and using Kafka with Confluent, check out Kafka with Confluent.
Part 5 - Building a Spark Streaming Application with Schema Registry Integration
We can build a Spark Streaming application that works with the Confluent Schema Registry. This will help us serialize and deserialize data using Avro schemas. It will make sure our data is compatible and can handle different versions.
Step 1: Set Up Dependencies
First, we need to add the right dependencies to our project. If we
use Maven, we should include the following in our
pom.xml
:
dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.0</version>
<dependency>
</dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.2.0</version>
<dependency>
</dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.0.1</version>
<dependency> </
We need to change the versions based on what we use in our project.
Step 2: Configure Spark Session
Next, we create a Spark session. We need to set it up to connect to Kafka and the Schema Registry:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark Streaming with Schema Registry")
.config("spark.sql.streaming.checkpointLocation", "/path/to/checkpoint/dir")
.config("kafka.bootstrap.servers", "localhost:9092")
.config("schema.registry.url", "http://localhost:8081")
.getOrCreate()
Step 3: Read Data from Kafka
Now, we use the readStream
method to get data from a
Kafka topic. We need to say the topic name and how to read the
value:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val kafkaStreamDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "your_topic_name")
.load()
val avroDF = kafkaStreamDF.selectExpr("CAST(value AS STRING) as json")
Step 4: Deserialize Avro Data
To read the Avro data, we will use the Schema Registry. We can create
a function for this. We use the KafkaAvroDeserializer
from
Confluent:
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val schemaRegistryUrl = "http://localhost:8081"
val deserializer = new KafkaAvroDeserializer()
.configure(Map("schema.registry.url" -> schemaRegistryUrl).asJava, false)
deserializer
def deserializeAvro(data: String): GenericRecord = {
.deserialize("your_topic_name", data.getBytes).asInstanceOf[GenericRecord]
deserializer}
// Register the UDF
.udf.register("deserializeAvro", deserializeAvro _)
spark
val processedDF = avroDF.selectExpr("deserializeAvro(json) as avro_record")
Step 5: Write to Sink
At last, we write the processed data to where we want. Here is how we can send it back to another Kafka topic:
val query = processedDF.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output_topic_name")
.option("checkpointLocation", "/path/to/checkpoint/dir")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
.awaitTermination() query
Conclusion
This Spark Streaming application can connect with the Confluent Schema Registry. It can handle real-time data streams with Avro serialization. If we want to learn more about using Kafka with Spark, we should look at this guide on Kafka with Spark.
Part 6 - Handling Schema Evolution in Streaming Applications
Handling schema evolution is important when we use Spark Structured Streaming with Confluent Schema Registry. Data structures change over time. So we need to make sure that different versions of schemas can work together. In this section, we will share best practices and show code examples for managing schema evolution well.
Understanding Schema Evolution
Schema evolution helps us change existing schemas while still letting consumers process older messages. The Confluent Schema Registry has different strategies for evolution. We can add or remove fields and change field types as long as the changes fit together.
Schema Compatibility Modes
The Schema Registry has different compatibility modes:
- Backward Compatibility: New schema can read data made by the last registered schema.
- Forward Compatibility: Old schema can read data made by the new schema.
- Full Compatibility: We need to keep both backward and forward compatibility.
We should choose the right compatibility mode when we register schemas based on what our application needs.
Registering a New Schema Version
When we change a schema, we need to register the new version with the Schema Registry. Here is an example of how to register a new schema version using the Confluent Schema Registry client.
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
# Configuration for Schema Registry
= {
config 'url': 'http://localhost:8081' # URL of your Schema Registry
}= SchemaRegistryClient(config)
schema_registry_client
# Define a new schema
= """
new_schema_str {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "email", "type": "string"} # New field added
]
}
"""
= Schema(new_schema_str, schema_type='AVRO')
schema
# Register the new schema
= 'user-value'
subject schema_registry_client.register_schema(subject, schema)
Using the Updated Schema in Spark Structured Streaming
After we register the new schema, we can use it in our Spark Structured Streaming application. We must make sure our data producer follows the new schema. Here is an example of how to read from Kafka with the updated schema:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
= SparkSession.builder \
spark "Schema Evolution Example") \
.appName(
.getOrCreate()
# Read from Kafka with the updated schema
= spark.readStream \
df format("kafka") \
."kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-topic") \
.option(
.load()
# Deserialize Avro data using the new schema
= df.selectExpr("CAST(value AS STRING) AS json") \
avro_df "json"), new_schema_str).alias("data")) \
.select(from_json(col("data.*")
.select(
# Process the streaming data
= avro_df.writeStream \
query "append") \
.outputMode(format("console") \
.
.start()
query.awaitTermination()
Best Practices for Schema Evolution
- Versioning: We should always version our schemas. Use semantic versioning to track changes.
- Compatibility Testing: Before we deploy new schema versions, we must test if they work with existing consumers.
- Graceful Degradation: We should have fallback options in our consumers for unexpected schema changes.
- Documentation: Keep good documentation about schema changes to help communication among team members.
By following these practices and using the Confluent Schema Registry, we can manage schema evolution in our Spark Structured Streaming applications. This way, we can ensure strong and flexible data processing. For more information on Kafka and Confluent Schema Registry, check Kafka with Confluent.
Conclusion
In this article, we looked at how to use Spark Structured Streaming with Confluent Schema Registry. We talked about important things like setting up the environment, making configurations, and how schema evolution works. This guide helps us to create strong streaming applications with Kafka and Spark. It also makes sure that we can serialize data easily using Avro schemas.
For more information, we can check our resources about Kafka with Confluent and Kafka and Spark.
Comments
Post a Comment