Skip to main content

[SOLVED] How to read records in JSON format from Kafka using Structured Streaming? - kafka

[SOLVED] Mastering JSON Records in Kafka with Structured Streaming

In this guide, we will look at how to read JSON records from Kafka using Spark’s Structured Streaming. Kafka is a strong tool for streaming data. JSON is popular for sharing data. So, knowing how to work with JSON messages in Kafka is very important for data processing now. We will take you step by step. We start from setting up Kafka to processing and saving the data.

Here is a quick look at what we will cover in this chapter:

  • Part 1 - Setting Up Your Kafka Environment: We will learn how to install and set up Kafka for best performance.
  • Part 2 - Configuring Spark Structured Streaming: We will understand how to connect Spark with Kafka.
  • Part 3 - Reading JSON Messages from Kafka Topic: We will see how to read and understand JSON records from Kafka topics.
  • Part 4 - Processing JSON Data with Spark DataFrame: We will explore how to change and analyze JSON data using Spark DataFrames.
  • Part 5 - Writing Processed Data to a Sink: We will learn how to save the processed data to different places.
  • Part 6 - Handling Schema Evolution in JSON Data: We will understand how to deal with changes in the JSON structure over time.

By the end of this article, we will know how to use Spark Structured Streaming to read and process JSON records from Kafka. This will help us build strong data processing applications. If you want to learn more about Kafka, you can check our articles on how to read from Kafka and Kafka with Spark.

Let’s get started!

Part 1 - Setting Up Your Kafka Environment

We need to read records in JSON format from Kafka using Structured Streaming. The first step is to set up our Kafka environment correctly. Here is a simple guide to help us get started.

Prerequisites

  1. Java Installation: We must have Java 8 or higher on our system. We can check if it is installed by running:

    java -version
  2. Apache Kafka: We should download the latest version of Apache Kafka from the official Kafka website. After that, we extract the files to a place we want.

  3. Apache ZooKeeper: Kafka needs ZooKeeper to manage its cluster. The good news is that Kafka includes a built-in ZooKeeper instance.

Step 1: Start ZooKeeper

Let’s go to the Kafka installation folder and start the ZooKeeper server:

cd kafka_2.12-<version>
bin/zookeeper-server-start.sh config/zookeeper.properties

Step 2: Start Kafka Broker

Now, we open a new terminal window and start the Kafka broker:

bin/kafka-server-start.sh config/server.properties

Step 3: Create a Kafka Topic

Before we can read JSON messages, we need to create a topic. Here is how we create a topic called json-topic:

bin/kafka-topics.sh --create --topic json-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

To check if the topic is created, we can list all topics:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Step 4: Configure Kafka Producer (Optional)

If we want to test reading JSON messages, we can send some JSON data to our topic. Here is a quick example using the console producer:

bin/kafka-console-producer.sh --topic json-topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

Then we can type our JSON records like this:

{"name": "John Doe", "age": 30}
{"name": "Jane Doe", "age": 25}

Press Ctrl+C to stop the producer.

Additional Configuration

We should also set up properties like the broker ID, log directories, and more in the server.properties file found in the config folder. Here are some important settings we might want to change:

  • Broker ID: This is a unique ID for the Kafka broker.
  • Log Directory: This is where Kafka keeps its log files.
  • Listeners: These are the network settings for the Kafka broker.

Testing the Setup

We can test our Kafka setup by using the console consumer to read messages from our topic:

bin/kafka-console-consumer.sh --topic json-topic --from-beginning --bootstrap-server localhost:9092

This command will show any messages sent to json-topic. This way, we can check if our Kafka environment is working well.

By following these steps, we will have a working Kafka environment ready to read records in JSON format using Apache Spark Structured Streaming. For more help on setting up Kafka, we can look at this Kafka setup guide.

Part 2 - Configuring Spark Structured Streaming

To read JSON records from Kafka using Spark Structured Streaming, we need to configure Spark correctly. This part will help us with the setup and configuration to connect Apache Spark with Apache Kafka.

Prerequisites

  1. Apache Spark: We must have Apache Spark installed. We can download it from the official Spark website.

  2. Kafka Dependencies: We need to add Kafka dependencies to our Spark project. If we use Maven, we add the following to our pom.xml:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.2.1</version> <!-- Make sure this version works with our Spark setup -->
    </dependency>

Spark Configuration

To configure Spark for Structured Streaming with Kafka, we must set different properties. These properties connect us to the Kafka cluster and control how the Spark streaming app behaves.

  1. Spark Session Initialization: First, we create a SparkSession in our application. This session helps us read data from Kafka.

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession.builder()
        .appName("Kafka JSON Streaming")
        .master("local[*]") // Change this to fit our cluster settings
        .getOrCreate()
  2. Setting Kafka Parameters: Next, we set the Kafka parameters needed to read from the Kafka topic. The main properties are the Kafka broker addresses and the topic name.

    val kafkaBrokers = "localhost:9092" // Change with our Kafka broker address
    val kafkaTopic = "your-topic-name" // Change with our Kafka topic name
  3. Reading from Kafka: We use the readStream method of SparkSession to read data from the Kafka topic. The code below shows how to set up the streaming DataFrame for reading JSON messages from Kafka:

    import org.apache.spark.sql.functions._
    
    val kafkaStreamDF = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaBrokers)
        .option("subscribe", kafkaTopic)
        .load()
  4. Selecting and Parsing JSON Data: The Kafka stream gives us records in binary format. We need to select the value column and convert it to JSON. We use the from_json function to change the binary data into a structured DataFrame. We define a schema that matches our JSON data.

    import org.apache.spark.sql.types._
    
    val jsonSchema = StructType(Array(
        StructField("field1", StringType),
        StructField("field2", IntegerType),
        // Add more fields that match our JSON structure
    ))
    
    val parsedDF = kafkaStreamDF.selectExpr("CAST(value AS STRING) as json_value")
        .select(from_json(col("json_value"), jsonSchema).alias("data"))
        .select("data.*")
  5. Output Mode: We need to choose the output mode for our streaming query. For example, if we want to write the processed data to the console for testing, we can set the output mode to append.

    val query = parsedDF.writeStream
        .outputMode("append")
        .format("console")
        .start()
    
    query.awaitTermination()

Additional Configuration Options

We can also set extra options like:

  • Starting Offsets: We can use the startingOffsets option to decide where to start reading messages in the topic (like earliest or latest).
  • Max Rate: We can control the maximum number of records per second with maxOffsetsPerTrigger.

Conclusion

By setting up Spark Structured Streaming with these settings, we can read JSON records from Kafka topics. This setup is important for making real-time streaming apps that process data as it comes in Kafka. For more details on connecting Spark with Kafka, we can check our guide on integrating Spark Structured Streaming with Kafka.

Part 3 - Reading JSON Messages from Kafka Topic

In this part, we will learn how to read records in JSON format from a Kafka topic using Apache Spark Structured Streaming. First, we need to set up a Spark session to connect to our Kafka broker. Let’s go through the steps with example code.

Step 1: Set Up Spark Session

First, we need to make sure that we have Spark with Kafka support. We can add the Kafka package when we start our Spark application. Here is how we can set up our Spark session:

from pyspark.sql import SparkSession

# Create Spark session with Kafka support
spark = SparkSession.builder \
    .appName("Kafka JSON Reader") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \
    .getOrCreate()

Step 2: Read from Kafka Topic

Next, we can read JSON data from a Kafka topic. We do this by using the readStream method and telling it to use Kafka as the source. We also need to give the Kafka broker address and the topic name.

# Define Kafka parameters
kafka_broker = "localhost:9092"  # Change this to your Kafka broker address
topic_name = "your_topic_name"     # Change this to your topic name

# Read JSON messages from Kafka topic
kafka_stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", topic_name) \
    .load()

# Print schema of incoming data
kafka_stream_df.printSchema()

Step 3: Extract JSON Data

The data we read from Kafka comes in binary format. It is in the value column. We need to change it to a string and then parse it as JSON. We can do this using the selectExpr method.

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema for the JSON data
json_schema = StructType([
    StructField("field1", StringType(), True),
    StructField("field2", IntegerType(), True),
    # Add more fields based on your JSON structure
])

# Select and parse the JSON data
json_df = kafka_stream_df.selectExpr("CAST(value AS STRING) as json_value") \
    .select(from_json(col("json_value"), json_schema).alias("data")) \
    .select("data.*")

# Print the schema of the parsed JSON DataFrame
json_df.printSchema()

Step 4: Start the Streaming Query

Now that we have the DataFrame with the JSON data, we can start the streaming query to process it. Here is an example to start a query that writes the output to the console:

# Start the query to write the output to the console
query = json_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Await termination of the query
query.awaitTermination()

Important Notes

  • Make sure your Kafka topic has JSON messages that match the schema we defined. You can read more about handling topics in Kafka here.
  • Change the schema to fit your JSON structure for proper parsing.
  • For more details on setting up Spark with Kafka, check this guide.

This way, we can read and process JSON messages from a Kafka topic using Spark Structured Streaming.

Part 4 - Processing JSON Data with Spark DataFrame

After we read the JSON messages from our Kafka topic using Spark Structured Streaming, the next step is to work with this JSON data using Spark DataFrames. Spark DataFrames help us manage structured data. They let us do many changes and tasks quickly.

To start processing JSON data with Spark DataFrame, we can follow these simple steps:

  1. Read JSON Data into DataFrame: After we stream from Kafka, we can change the incoming JSON messages into a DataFrame. We need to import the right Spark SQL functions first.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    
    # Create Spark session
    spark = SparkSession.builder \
        .appName("Kafka JSON Processing") \
        .getOrCreate()
    
    # Define Kafka source
    kafka_source = "kafka://localhost:9092/topic_name"
    
    # Read JSON messages from Kafka
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "topic_name") \
        .load()
    
    # Extract the JSON data from the Kafka value column
    json_df = df.selectExpr("CAST(value AS STRING) AS json")
  2. Parse JSON Data: We will use the from_json function to change the JSON strings into structured DataFrames. We need to define a schema so Spark knows the structure of our JSON data.

    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    # Define the schema for your JSON data
    json_schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("timestamp", StringType(), True)
    ])
    
    # Parse JSON data
    parsed_df = json_df.select(from_json(col("json"), json_schema).alias("data")).select("data.*")
  3. Transform Data: Now we can make different transformations on the DataFrame. This includes filtering, adding new data, or changing the current data.

    # Example: Filter records based on a condition
    filtered_df = parsed_df.filter(col("id") > 100)
    
    # Example: Add a new column
    transformed_df = filtered_df.withColumn("processed_timestamp", current_timestamp())
  4. Write Processed Data: Finally, we can write the processed DataFrame to a place, like another Kafka topic, a file, or a database.

    query = transformed_df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "processed_topic") \
        .option("checkpointLocation", "/path/to/checkpoint/dir") \
        .start()
    
    query.awaitTermination()

This process helps us handle and process JSON data that comes from Kafka using Spark DataFrames. If we want to learn more about how to connect Spark with Kafka, we can look at this link on integrating Spark Structured Streaming with Kafka.

By using Spark DataFrames, we can do many complex changes and analysis on streaming JSON data easily and in a scalable way.

Part 5 - Writing Processed Data to a Sink

After we process JSON data with Spark DataFrame, the next step is to write the data to a sink. We can do this using different output formats that Spark supports. Some options are Parquet, JSON, or even sending data back to a Kafka topic. Here are some common ways to do this.

Writing to a Parquet File

Parquet is a type of file format that works well with big data processing. It is a good choice for storing processed data.

# We assume 'processedData' is the DataFrame with our processed data
processedData.write \
    .mode("overwrite") \
    .parquet("/path/to/output/directory")

Writing to a JSON File

If we want to keep our data in JSON format, we can write it directly to JSON files.

processedData.write \
    .mode("overwrite") \
    .json("/path/to/output/directory")

Writing Back to a Kafka Topic

If we want to send the processed data back to a Kafka topic, we can use this method. We need to make sure we have the right Kafka settings.

processedData.selectExpr("CAST(key AS STRING)", "to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output-topic") \
    .save()

In this example:

  • key is the Kafka message key. We can set it to a specific value or get it from our DataFrame.
  • to_json(struct(*)) changes all columns of the DataFrame into a JSON string. This string will be the Kafka message value.

Writing to Other Sinks

We can also write processed data to other places like:

  • Databases (using JDBC)
  • HDFS or cloud storage
  • NoSQL databases (like MongoDB or Cassandra)

To write to a database, we can use:

processedData.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/database_name") \
    .option("dbtable", "table_name") \
    .option("user", "username") \
    .option("password", "password") \
    .mode("append") \
    .save()

Configurations and Options

When we write data, we can set options like:

  • mode: This tells how to handle existing data (like “append”, “overwrite”, “ignore”, “error”).
  • partitionBy: This is used to split the data when we write to file formats like Parquet.

We should change the settings based on our needs and what we want to do. For more details on writing data and setting up sinks, we can check the Kafka documentation or see how to integrate Spark with Kafka.

Part 6 - Handling Schema Evolution in JSON Data

Handling schema evolution in JSON data is very important when we work with Kafka and Spark Structured Streaming. As our application grows, the structure of JSON messages can change. This change can cause problems in processing data if we do not manage the schema well. Here are some easy ways to handle schema evolution.

1. Use Schema Registry

One good way to manage schema evolution is to use a Schema Registry. We can use Confluent Schema Registry to store and manage schemas. It helps us define the schema for our JSON data and follow compatibility rules as the schema changes.

Steps to Use Schema Registry:

  • Install Confluent Schema Registry: We should follow the installation guide from the Confluent documentation.

  • Register Schemas: We can register our schema using the REST API from the Schema Registry.

curl -X POST -H "Content-Type: application/json" \
     --data '{
       "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
     }' \
     http://localhost:8081/subjects/User-value/versions
  • Use Avro or JSON Schema: We must make sure that our producers and consumers know the schema version to serialize and deserialize the data correctly.

2. Leverage Spark’s Support for Evolving Schemas

Spark DataFrame API has built-in features to handle schema evolution. If our data schema changes often, we can use these features:

  • StructType for Nested Structures: We can define our schema with StructType. This allows us to create complex nested data structures.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
  • Allow Null Values: When we define schemas, we should make fields nullable. This helps us handle changes like adding or removing fields.

3. Implement DataFrame API Functions

We can use DataFrame transformations to handle optional fields. For example, if our JSON messages now have an email field that is not always there, we can use the withColumn method to handle it well.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("KafkaStreaming") \
    .getOrCreate()

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "your-topic") \
    .load()

# Assume df contains JSON data
json_df = df.selectExpr("CAST(value AS STRING)")

# Define the schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("email", StringType(), True)  # New field added
])

# Parse JSON and handle schema evolution
parsed_df = json_df.select(from_json(col("value"), schema).alias("data")).select("data.*")

# Processing DataFrame
parsed_df.writeStream \
    .format("console") \
    .start() \
    .awaitTermination()

4. Maintain Compatibility

When we change our schema:

  • Backward Compatibility: We need to make sure new versions of the schema can read data from older versions.
  • Forward Compatibility: Older versions should also read data from the new schema.

By following these rules, we can make sure that our Kafka consumers and producers work well. Even when our JSON schema changes.

5. Testing and Validation

We should regularly check our JSON data against the expected schema. We can use tools like Apache Avro or JSON Schema Validator. This helps us make sure that the data follows the schema defined in our registry.

For more information on how to connect Kafka with Spark, check this article. By using these strategies, we can manage schema evolution in JSON data while using Kafka and Spark Structured Streaming.

Conclusion

In this guide, we look at how we can read records in JSON format from Kafka using Structured Streaming. We talk about important steps. These steps go from setting up your Kafka environment to processing and writing JSON data with Spark DataFrames.

This guide helps us to make real-time data processing easier. It also helps us to deal with schema evolution problems in JSON.

For more information, we can check our articles on Kafka consumer settings and integrating Spark with Kafka. These articles can help us to improve our data streaming skills.

Comments