Skip to main content

[SOLVED] Integrating Spark Structured Streaming with the Confluent Schema Registry - kafka

Mastering the Integration of Spark Structured Streaming with the Confluent Schema Registry in Kafka

In this guide, we will look at how to connect Spark Structured Streaming with the Confluent Schema Registry. This is a strong mix that helps us handle real-time data better in Apache Kafka. We will talk about important parts like setting up the environment, configuring Spark for structured streaming, and managing data using Avro schemas. When we understand how to use the Confluent Schema Registry well, we can create strong streaming apps that adjust to changes in the schema over time. This guide is for data engineers, software developers, and architects. It gives us the knowledge and tools to make this integration work well.

In this chapter, we will discuss the following solutions:

  • Part 1 - Setting Up the Environment: We will learn how to get our system ready for Spark Structured Streaming and Kafka integration.
  • Part 2 - Configuring Spark for Structured Streaming: We will see the needed settings to make Spark ready for structured streaming apps.
  • Part 3 - Connecting to the Confluent Schema Registry: We will find out how to connect our Spark app to the Confluent Schema Registry.
  • Part 4 - Defining Avro Schemas for Data Serialization: We will look at how to define Avro schemas to serialize data well.
  • Part 5 - Building a Spark Streaming Application with Schema Registry Integration: We will go through the steps to create a Spark streaming app that uses the Schema Registry.
  • Part 6 - Handling Schema Evolution in Streaming Applications: We will learn how to manage and apply schema evolution in our streaming app.

By following this guide, we will be ready to connect Spark Structured Streaming with the Confluent Schema Registry. This will make our Kafka streaming apps better. For more tips on Kafka integration, check our resources on Kafka with Confluent and Kafka with Spark.

Part 1 - Setting Up the Environment

To connect Spark Structured Streaming with the Confluent Schema Registry, we need to set up the environment. This means we will install and configure Apache Spark, Apache Kafka, and the Confluent Schema Registry.

Prerequisites

  1. Java Development Kit (JDK): We need JDK 8 or newer.

    We can check if Java is installed by running:

    java -version
  2. Apache Spark: We need to download and install Spark.

    tar -xzf spark-3.2.1-bin-hadoop3.2.tgz
    cd spark-3.2.1-bin-hadoop3.2
  3. Apache Kafka: We also need to download and install Kafka.

    tar -xzf kafka_2.12-2.8.0.tgz
    cd kafka_2.12-2.8.0
  4. Confluent Schema Registry: To use the Schema Registry, we can either download it with the Confluent Platform or run it using Docker.

    • To run with Docker, we can use this command:
    docker run -d --name schema-registry \
      -p 8081:8081 \
      -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=<kafka-broker>:9092 \
      -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 \
      confluentinc/cp-schema-registry:latest

    We need to replace <kafka-broker> with our Kafka broker address.

Environment Configuration

  1. Set Environment Variables: We should add these lines to our .bashrc or .bash_profile file to set the environment variables for Spark and Kafka.

    export SPARK_HOME=/path/to/spark-3.2.1-bin-hadoop3.2
    export KAFKA_HOME=/path/to/kafka_2.12-2.8.0
    export PATH=$PATH:$SPARK_HOME/bin:$KAFKA_HOME/bin

    After editing, we refresh our terminal:

    source ~/.bashrc
  2. Start Kafka and Zookeeper: Kafka needs Zookeeper to manage the cluster. We can start both Zookeeper and Kafka with these commands:

    # Start Zookeeper
    $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
    
    # Start Kafka Broker
    $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
  3. Verify Kafka Installation: We can create a topic to check our Kafka installation:

    $KAFKA_HOME/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

    We can list the topics to make sure it was created:

    $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  4. Testing Schema Registry: We can check if the Schema Registry is running by sending a GET request:

    curl http://localhost:8081/subjects

This setup will prepare our environment for connecting Spark Structured Streaming with the Confluent Schema Registry. For more details on Kafka configuration, we can look at our article on Kafka with Confluent. This will help us ensure we have all we need to continue with the integration.

Part 2 - Configuring Spark for Structured Streaming

To connect Spark Structured Streaming to the Confluent Schema Registry, we need to set up our Spark environment. This means we must install Spark with the right tools for streaming and Schema Registry. Here are the steps to configure Spark for Structured Streaming.

Step 1: Set Up Spark Environment

First, we need to have Apache Spark installed. We can download it from the official Apache Spark website. We also need Scala and Java because Spark uses these.

Step 2: Adding Dependencies

To use Spark Structured Streaming with the Confluent Schema Registry, we must add some dependencies to our build file. If we use Maven, we add these in our pom.xml:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.3.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-registry-client</artifactId>
    <version>7.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

Step 3: Configuration Properties

Next, we configure the Spark application to connect to the Kafka broker and the Confluent Schema Registry. We can do this by setting properties in our Spark application. Here is an example of how to configure Spark to read from Kafka:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
    .appName("Spark Structured Streaming with Schema Registry")
    .config("spark.sql.streaming.checkpointLocation", "/path/to/checkpoint/dir")
    .config("kafka.bootstrap.servers", "localhost:9092")
    .config("schema.registry.url", "http://localhost:8081")
    .getOrCreate()

Step 4: Enabling Avro Support

To enable Avro serialization and deserialization, we need to add the Avro serializer in our configurations. We can set the default for the key and value serializers in our Spark application like this:

spark.conf.set("key.converter", "io.confluent.connect.avro.AvroConverter")
spark.conf.set("value.converter", "io.confluent.connect.avro.AvroConverter")
spark.conf.set("value.converter.schema.registry.url", "http://localhost:8081")

Step 5: Running Your Spark Application

After we configure Spark, we can run our application using spark-submit. We must include the right jars in the classpath. Here is an example command to submit our Spark job:

spark-submit \
  --class com.example.YourSparkApp \
  --master local[2] \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,io.confluent:kafka-schema-registry-client:7.4.0 \
  target/your-spark-app-1.0.jar

Conclusion

By following these steps, we can have a configured Spark environment ready for Structured Streaming with the Confluent Schema Registry. For more details on Kafka integration with Spark, check this resource.

This setup helps our Spark application read from and write to Kafka topics easily while managing Avro schemas.

Part 3 - Connecting to the Confluent Schema Registry

To connect our Spark Structured Streaming apps to the Confluent Schema Registry, we must set up our Spark app to use the Schema Registry. This helps us manage Avro schemas. The Schema Registry is a central place for our schemas. It keeps our data consistent when it goes between Kafka and our Spark app.

Prerequisites

  1. Confluent Schema Registry: We need to make sure that the Confluent Schema Registry is running. We can follow the Kafka with Confluent guide to install and set it up.
  2. Spark Structured Streaming: We should have Apache Spark set up and ready to work with Kafka.

Dependencies

We need to add the following dependencies to our build.sbt or pom.xml if we are using Maven for our Spark app:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-registry-client</artifactId>
    <version>7.0.0</version> <!-- Use the right version -->
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.2.0</version> <!-- Use the right version -->
</dependency>

Configuration

To connect to the Schema Registry from our Spark app, we need to set these properties in our Spark configuration:

val spark = SparkSession.builder()
    .appName("Spark Structured Streaming with Schema Registry")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("schema.registry.url", "http://localhost:8081") // Change the URL if needed
    .config("value.converter", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
    .config("value.converter.schema.registry.url", "http://localhost:8081") // Schema Registry URL
    .getOrCreate()

Kafka Consumer Configuration

When we set up our Kafka consumer, we should use the KafkaAvroDeserializer for the value deserializer. Here’s how we can set it up in our Spark Structured Streaming app:

val kafkaStream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092") // Kafka broker address
    .option("subscribe", "your_topic") // Our Kafka topic
    .option("startingOffsets", "earliest") // Start from the first message
    .load()

// Deserialize the Avro data with the Schema Registry
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

val avroSerde = new GenericAvroSerde()
avroSerde.configure(Collections.singletonMap("schema.registry.url", "http://localhost:8081"), false)

val deserializedData = kafkaStream.selectExpr("CAST(value AS STRING)")
    .map(row => {
        val value = row.getString(0)
        // Use the Avro deserializer here
        avroSerde.deserializer().deserialize("your_topic", value.getBytes)
    })

Error Handling

We need to handle possible errors when we connect to the Schema Registry. We can add a try-catch block around our connection code to deal with any problems that may happen.

try {
    // Try to connect to the Schema Registry
    val schemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:8081", 100)
} catch {
    case e: Exception =>
        println(s"Failed to connect to Schema Registry: ${e.getMessage}")
}

This setup helps our Spark Structured Streaming app work with the Confluent Schema Registry. It makes sure our Kafka messages are properly serialized and deserialized with the defined Avro schemas. For more info on Kafka integration, we can check out Kafka with Spark.

Part 4 - Defining Avro Schemas for Data Serialization

In this section, we will define Avro schemas. These schemas are important for data serialization. We use them when we connect Spark Structured Streaming with the Confluent Schema Registry. Avro is a well-known tool. It gives us a small and fast way to send data. This is very important for streaming applications.

Understanding Avro Schemas

Avro schemas show how the data looks when we serialize it. We write an Avro schema in JSON format. It usually has these parts:

  • Type: This tells the data type (like record, enum, array).
  • Name: This is the name of the schema.
  • Fields: This is a list of fields in the schema. Each field has a name and type.

Example Avro Schema

Here is an example of an Avro schema for a user profile that we can send through Kafka topics:

{
  "type": "record",
  "name": "UserProfile",
  "namespace": "com.example",
  "fields": [
    {
      "name": "userId",
      "type": "string"
    },
    {
      "name": "firstName",
      "type": "string"
    },
    {
      "name": "lastName",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int"
    },
    {
      "name": "registered",
      "type": "boolean"
    }
  ]
}

Registering Avro Schemas with Confluent Schema Registry

After we define our Avro schema, we need to register it with the Confluent Schema Registry. This helps us manage schema versions and deal with schema changes in our streaming apps.

To register the schema, we can use the Schema Registry REST API. Here is an example using curl:

curl -X POST \
  http://localhost:8081/subjects/UserProfile-value/versions \
  -H "Content-Type: application/json" \
  -d '{
        "schema": "{\"type\":\"record\",\"name\":\"UserProfile\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"userId\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"registered\",\"type\":\"boolean\"}]}"
      }'

Serializing Data with Avro in Spark

In our Spark application, we use the Avro schema to serialize data before we send it to Kafka. Below is a sample code snippet. It shows how to serialize a UserProfile object using the Avro schema:

import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.io.{EncoderFactory, DatumWriter, Encoder, EncoderFactory}
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

val schemaString = """{
  "type": "record",
  "name": "UserProfile",
  "namespace": "com.example",
  "fields": [
    {"name": "userId", "type": "string"},
    {"name": "firstName", "type": "string"},
    {"name": "lastName", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "age", "type": "int"},
    {"name": "registered", "type": "boolean"}
  ]
}"""

val schema: Schema = new Schema.Parser().parse(schemaString)
val record = new GenericData.Record(schema)
record.put("userId", "1234")
record.put("firstName", "John")
record.put("lastName", "Doe")
record.put("email", "john.doe@example.com")
record.put("age", 30)
record.put("registered", true)

val producer = new KafkaProducer[String, Array[Byte]](kafkaProducerConfig)
val byteArrayOutputStream = new ByteArrayOutputStream
val datumWriter: DatumWriter[GenericData.Record] = new SpecificDatumWriter[GenericData.Record](schema)
val encoder: Encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)

datumWriter.write(record, encoder)
encoder.flush()
byteArrayOutputStream.close()

val serializedData: Array[Byte] = byteArrayOutputStream.toByteArray
val producerRecord = new ProducerRecord[String, Array[Byte]]("user-profiles", serializedData)
producer.send(producerRecord)

Conclusion

Defining Avro schemas for data serialization is an important step. We do this when we integrate Spark Structured Streaming with the Confluent Schema Registry. By following the schema definitions, we keep our data safe and compatible across different versions of our streaming apps. For more details on setting up and using Kafka with Confluent, check out Kafka with Confluent.

Part 5 - Building a Spark Streaming Application with Schema Registry Integration

We can build a Spark Streaming application that works with the Confluent Schema Registry. This will help us serialize and deserialize data using Avro schemas. It will make sure our data is compatible and can handle different versions.

Step 1: Set Up Dependencies

First, we need to add the right dependencies to our project. If we use Maven, we should include the following in our pom.xml:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.2.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.0.1</version>
</dependency>

We need to change the versions based on what we use in our project.

Step 2: Configure Spark Session

Next, we create a Spark session. We need to set it up to connect to Kafka and the Schema Registry:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
    .appName("Spark Streaming with Schema Registry")
    .config("spark.sql.streaming.checkpointLocation", "/path/to/checkpoint/dir")
    .config("kafka.bootstrap.servers", "localhost:9092")
    .config("schema.registry.url", "http://localhost:8081")
    .getOrCreate()

Step 3: Read Data from Kafka

Now, we use the readStream method to get data from a Kafka topic. We need to say the topic name and how to read the value:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val kafkaStreamDF = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "your_topic_name")
    .load()

val avroDF = kafkaStreamDF.selectExpr("CAST(value AS STRING) as json")

Step 4: Deserialize Avro Data

To read the Avro data, we will use the Schema Registry. We can create a function for this. We use the KafkaAvroDeserializer from Confluent:

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val schemaRegistryUrl = "http://localhost:8081"
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(Map("schema.registry.url" -> schemaRegistryUrl).asJava, false)

def deserializeAvro(data: String): GenericRecord = {
    deserializer.deserialize("your_topic_name", data.getBytes).asInstanceOf[GenericRecord]
}

// Register the UDF
spark.udf.register("deserializeAvro", deserializeAvro _)

val processedDF = avroDF.selectExpr("deserializeAvro(json) as avro_record")

Step 5: Write to Sink

At last, we write the processed data to where we want. Here is how we can send it back to another Kafka topic:

val query = processedDF.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "output_topic_name")
    .option("checkpointLocation", "/path/to/checkpoint/dir")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .start()

query.awaitTermination()

Conclusion

This Spark Streaming application can connect with the Confluent Schema Registry. It can handle real-time data streams with Avro serialization. If we want to learn more about using Kafka with Spark, we should look at this guide on Kafka with Spark.

Part 6 - Handling Schema Evolution in Streaming Applications

Handling schema evolution is important when we use Spark Structured Streaming with Confluent Schema Registry. Data structures change over time. So we need to make sure that different versions of schemas can work together. In this section, we will share best practices and show code examples for managing schema evolution well.

Understanding Schema Evolution

Schema evolution helps us change existing schemas while still letting consumers process older messages. The Confluent Schema Registry has different strategies for evolution. We can add or remove fields and change field types as long as the changes fit together.

Schema Compatibility Modes

The Schema Registry has different compatibility modes:

  • Backward Compatibility: New schema can read data made by the last registered schema.
  • Forward Compatibility: Old schema can read data made by the new schema.
  • Full Compatibility: We need to keep both backward and forward compatibility.

We should choose the right compatibility mode when we register schemas based on what our application needs.

Registering a New Schema Version

When we change a schema, we need to register the new version with the Schema Registry. Here is an example of how to register a new schema version using the Confluent Schema Registry client.

from confluent_kafka.schema_registry import SchemaRegistryClient, Schema

# Configuration for Schema Registry
config = {
    'url': 'http://localhost:8081'  # URL of your Schema Registry
}
schema_registry_client = SchemaRegistryClient(config)

# Define a new schema
new_schema_str = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
    {"name": "email", "type": "string"}  # New field added
  ]
}
"""
schema = Schema(new_schema_str, schema_type='AVRO')

# Register the new schema
subject = 'user-value'
schema_registry_client.register_schema(subject, schema)

Using the Updated Schema in Spark Structured Streaming

After we register the new schema, we can use it in our Spark Structured Streaming application. We must make sure our data producer follows the new schema. Here is an example of how to read from Kafka with the updated schema:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("Schema Evolution Example") \
    .getOrCreate()

# Read from Kafka with the updated schema
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user-topic") \
    .load()

# Deserialize Avro data using the new schema
avro_df = df.selectExpr("CAST(value AS STRING) AS json") \
    .select(from_json(col("json"), new_schema_str).alias("data")) \
    .select("data.*")

# Process the streaming data
query = avro_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

Best Practices for Schema Evolution

  1. Versioning: We should always version our schemas. Use semantic versioning to track changes.
  2. Compatibility Testing: Before we deploy new schema versions, we must test if they work with existing consumers.
  3. Graceful Degradation: We should have fallback options in our consumers for unexpected schema changes.
  4. Documentation: Keep good documentation about schema changes to help communication among team members.

By following these practices and using the Confluent Schema Registry, we can manage schema evolution in our Spark Structured Streaming applications. This way, we can ensure strong and flexible data processing. For more information on Kafka and Confluent Schema Registry, check Kafka with Confluent.

Conclusion

In this article, we looked at how to use Spark Structured Streaming with Confluent Schema Registry. We talked about important things like setting up the environment, making configurations, and how schema evolution works. This guide helps us to create strong streaming applications with Kafka and Spark. It also makes sure that we can serialize data easily using Avro schemas.

For more information, we can check our resources about Kafka with Confluent and Kafka and Spark.

Comments