Skip to main content

[SOLVED] How Can I Add Topics to My @KafkaListener at Runtime? - kafka

[SOLVED] How to Dynamically Add Topics to Your @KafkaListener at Runtime - Kafka

In Apache Kafka, we need to manage topics and subscriptions at runtime. This is very important for building flexible and responsive data apps. In this chapter, we will see how to improve your @KafkaListener. We can support adding topics while the app is running. This helps our app to adapt to changes without stopping. By using Kafka’s tools, we can manage topic subscriptions in real-time.

We will look at these solutions for adding topics to your @KafkaListener at runtime:

  • Understanding @KafkaListener and Topic Subscription: Here, we will give a simple overview of how @KafkaListener works with Kafka topics.
  • Using Kafka Admin to Manage Topics: We will show steps to use Kafka Admin to create and delete topics using code.
  • Dynamic Topic Subscription with ApplicationListener: We will explain how to use ApplicationListener to listen for events that change topic subscriptions.
  • Implementing a Custom KafkaListenerContainerFactory: We will create a custom factory to manage listener containers that can change at runtime.
  • Leveraging KafkaListenerEndpointRegistry for Runtime Changes: We will use the KafkaListenerEndpointRegistry to change listener endpoints while the app runs.
  • Handling Consumer Rebalance Events: We will discuss ways to manage consumer rebalance events during dynamic topic subscriptions.

For more reading, you may like these articles: Understanding Kafka Topics and How to Change Number of Replicas. These resources can give us more insights into managing Kafka topics and improving our app’s performance.

Part 1 - Understanding @KafkaListener and Topic Subscription

In Spring Kafka, we use the @KafkaListener annotation to make a listener for a specific Kafka topic. This annotation makes it easier to set up a listener for incoming messages. It helps us automatically subscribe to topics and manage messages. The main parts to know when we use @KafkaListener are listener settings, topic subscriptions, and consumer group settings.

Basic Configuration

To use @KafkaListener, we need to set up a Kafka consumer factory and a listener container factory. Here is how we can do this:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "my-group");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Creating a Kafka Listener

After we set up the configuration, we can create a listener using the @KafkaListener annotation. Here is an example:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaMessageListener {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

In this example, the listener subscribes to the topic my-topic and processes incoming messages by printing them. The groupId tells us which consumer group this listener belongs to.

Dynamic Topic Subscription

If we want to add topics to our @KafkaListener during runtime, we can use KafkaListenerEndpointRegistry. This helps us manage listeners in a program way. This is very helpful for applications that need to change to new topics depending on runtime needs.

For more details on topics and how to manage them, we can check the article Understanding Kafka Topics and Their Subscription. This article gives us more information on managing and subscribing to Kafka topics well.

By knowing the basics of @KafkaListener and topic subscriptions, we can handle many situations in our Kafka applications.

Part 2 - Using Kafka Admin to Manage Topics

To manage topics and subscriptions in Apache Kafka at runtime, we can use the Kafka Admin Client. The Admin Client helps us create, delete, and change Kafka topics easily. This gives us a good way to manage our Kafka setup. By adding Kafka Admin to our Spring application, we can handle topic management together with our Kafka listeners.

Setting Up Kafka Admin

First, we need to make sure we have the right Kafka dependencies in our pom.xml or build.gradle file:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.0</version>
</dependency>

Configuring Kafka Admin

Next, we need to set up the Kafka Admin bean in our Spring application. Here is an example of how to configure it:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaAdminConfig {

    @Bean
    public AdminClient kafkaAdmin() {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return AdminClient.create(config);
    }
}

Creating a New Topic

We can create a new topic using the Kafka Admin client. Here is a method to create a topic with some settings:

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.AdminClient;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

public void createTopic(AdminClient adminClient, String topicName, int partitions, short replicationFactor) throws ExecutionException, InterruptedException {
    NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
    adminClient.createTopics(Collections.singleton(newTopic)).all().get();
}

Deleting a Topic

We can also delete a topic using the deleteTopics method:

public void deleteTopic(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
    adminClient.deleteTopics(Collections.singleton(topicName)).all().get();
}

Altering a Topic

If we need to change an existing topic, like the number of partitions, we can use this method:

import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.Config;

public void alterTopicPartitions(AdminClient adminClient, String topicName, int newPartitions) throws ExecutionException, InterruptedException {
    adminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, topicName),
        Collections.singleton(new AlterConfigOp(newConfig, AlterConfigOp.OpType.SET)))).all().get();
}

Properties and Configurations

When we create or change topics, we might want to set extra properties. This can include things like retention and cleanup policies. We can do this by using the NewTopic constructor or the alterConfigs method.

Example Usage

Here is how we can use these methods in our Spring service:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class KafkaTopicService {

    @Autowired
    private AdminClient adminClient;

    public void manageTopics() throws ExecutionException, InterruptedException {
        createTopic(adminClient, "my-new-topic", 3, (short) 1);
        // Do other topic management stuff
    }
}

Conclusion

Using the Kafka Admin client helps us manage topics in our Kafka setup. This allows our application to adjust to changing needs effectively. For more details on Kafka topics, you can check this Kafka Topics Overview.

By using the Kafka Admin features, we can make sure our application stays flexible and responsive to the needs of our data streaming setup.

Part 3 - Dynamic Topic Subscription with ApplicationListener

We can make our @KafkaListener subscribe to topics at runtime. To do this, we will use the Spring ApplicationListener interface. This way, we can listen for events in our application and change the topics our Kafka listener subscribes to.

Step-by-Step Implementation

  1. Create a Custom Event: First, we need to define a custom event. This event will hold information about topic changes like when we add or remove topics.

    import org.springframework.context.ApplicationEvent;
    
    public class TopicChangeEvent extends ApplicationEvent {
        private final String topic;
    
        public TopicChangeEvent(Object source, String topic) {
            super(source);
            this.topic = topic;
        }
    
        public String getTopic() {
            return topic;
        }
    }
  2. Publish Events: Next, we will create a method to publish this event. We do this when we want to change the topics.

    import org.springframework.context.ApplicationEventPublisher;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicPublisher {
        private final ApplicationEventPublisher publisher;
    
        public TopicPublisher(ApplicationEventPublisher publisher) {
            this.publisher = publisher;
        }
    
        public void addTopic(String topic) {
            publisher.publishEvent(new TopicChangeEvent(this, topic));
        }
    
        public void removeTopic(String topic) {
            publisher.publishEvent(new TopicChangeEvent(this, topic));
        }
    }
  3. Implement the Listener: Now, we need to make an ApplicationListener. This listener will react to the events and update our KafkaListener subscriptions.

    import org.springframework.context.ApplicationListener;
    import org.springframework.kafka.listener.KafkaListenerEndpointRegistry;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicChangeListener implements ApplicationListener<TopicChangeEvent> {
        private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
        public TopicChangeListener(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
            this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
        }
    
        @Override
        public void onApplicationEvent(TopicChangeEvent event) {
            String topic = event.getTopic();
            // Logic to add or remove topic from listener
            // We assume we have a listener id here
            kafkaListenerEndpointRegistry.getListenerContainer("yourListenerId")
                .getAssignedPartitions()
                .add(new TopicPartition(topic, 0)); // Example for adding a topic
    
            // Add logic for removal if needed
        }
    }
  4. Configuring the Kafka Listener: We must make sure our Kafka listener can handle dynamic topic changes. We might need to change some settings based on what our application needs.

    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyKafkaListener {
        @KafkaListener(id = "yourListenerId", topics = "#{__listener.topics}")
        public void listen(String message) {
            // Handle incoming message
            System.out.println("Received message: " + message);
        }
    }

Summary of Key Steps

  • We define a custom event for topic changes.
  • We create a publisher for topic change events.
  • We implement an ApplicationListener to respond to these events and update our Kafka listener.
  • We ensure our Kafka listener is set up for dynamic subscriptions.

With this setup, we can subscribe to new topics at runtime using the Spring framework. This makes our Kafka listener flexible and able to meet the changing needs of our application. For more details on Kafka topics, we can check out Understanding Kafka Topics.

Part 4 - Implementing a Custom KafkaListenerContainerFactory

To add topics to our @KafkaListener while the program runs, we may need to make a custom KafkaListenerContainerFactory. This gives us more control over how the listener is set up and how it works. We can change the subscribed topics as we need.

Step 1: Define Custom KafkaListenerContainerFactory

We can create a custom KafkaListenerContainerFactory by extending ConcurrentKafkaListenerContainerFactory. In this custom factory, we can set up the consumer settings and manage subscriptions easily.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

public class CustomKafkaListenerFactory {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3); // We can change concurrency as we need
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Step 2: Create a Dynamic Subscription Method

To let us add topics while the program runs, we can make a method in our service. This method will use the KafkaListenerEndpointRegistry to add new topics to the current listener.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
public class KafkaDynamicTopicService {

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public void addTopicToListener(String listenerId, String newTopic) {
        MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
        if (container != null) {
            container.getContainerProperties().setTopics(newTopic);
            container.start(); // We restart the container if needed
        }
    }
}

Step 3: Annotate Your Listener

We need to make sure our listener is annotated right to use the custom factory.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyKafkaListener {

    @KafkaListener(id = "myListener", topics = "initialTopic", containerFactory = "customKafkaListenerContainerFactory")
    public void listen(String message) {
        System.out.println("Received Message: " + message);
    }
}

Usage Example

Now we can add new topics to our listener while the program runs by calling the addTopicToListener method:

@Autowired
private KafkaDynamicTopicService kafkaDynamicTopicService;

// Adding new topic at runtime
kafkaDynamicTopicService.addTopicToListener("myListener", "newTopic");

This setup lets our @KafkaListener change to new topics while the program runs. This makes our Kafka consumer design more flexible. We can read more about understanding Kafka topics to get better insights on how topics work in Kafka.

Part 5 - Leveraging KafkaListenerEndpointRegistry for Runtime Changes

To manage our Kafka listeners while the application is running, we can use the KafkaListenerEndpointRegistry. This registry helps us change listener endpoints easily. We can add or remove topics without needing to restart our application. Here are the main steps to do this:

  1. Inject KafkaListenerEndpointRegistry: First, we need to inject the KafkaListenerEndpointRegistry into our service or component. This registry takes care of the listener endpoints made by @KafkaListener.

    import org.springframework.kafka.listener.KafkaListenerEndpointRegistry;
    import org.springframework.stereotype.Service;
    
    @Service
    public class KafkaListenerService {
    
        private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
        public KafkaListenerService(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
            this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
        }
    }
  2. Modify the Listener’s Topics: We can change the topics that a listener uses by getting the specific listener endpoint from the registry. This helps us add or remove topics easily.

    import org.springframework.kafka.listener.config.KafkaListenerEndpoint;
    import java.util.Collections;
    
    public void updateListenerTopics(String listenerId, String newTopic) {
        KafkaListenerEndpoint endpoint = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
    
        if (endpoint != null) {
            endpoint.getContainerProperties().setTopics(newTopic);
            endpoint.start(); // Restart the listener to apply the new topics
        }
    }
  3. Starting and Stopping Listeners: We can start or stop listeners as needed using the registry. This is helpful when we want to turn off a listener for a while before we make changes.

    public void stopListener(String listenerId) {
        kafkaListenerEndpointRegistry.getListenerContainer(listenerId).stop();
    }
    
    public void startListener(String listenerId) {
        kafkaListenerEndpointRegistry.getListenerContainer(listenerId).start();
    }
  4. Example Usage: Here is how we can use these methods in our application.

    // Example method to add a new topic to an existing listener
    public void addTopicToListener(String listenerId, String newTopic) {
        stopListener(listenerId); // Stop the listener before updating
        updateListenerTopics(listenerId, newTopic); // Update topics
        startListener(listenerId); // Restart the listener
    }
  5. Configuration Properties: Make sure our application uses the right Kafka properties in the application.yml or application.properties file.

    spring:
      kafka:
        bootstrap-servers: localhost:9092
        consumer:
          group-id: your-group-id
          auto-offset-reset: latest

By using the KafkaListenerEndpointRegistry, we can manage topic subscriptions for our Kafka listeners while the application runs. This makes our application more flexible and responsive. We do not need to restart the application and we can easily work with changing topics.

For more information on Kafka listener settings, we can check the understanding Kafka topics and Kafka consumer settings.

Part 6 - Handling Consumer Rebalance Events

When we work with topic subscriptions in Apache Kafka using the @KafkaListener annotation, we must manage consumer rebalance events. Rebalances happen when a new consumer joins or leaves a Kafka consumer group. They can also occur when the number of partitions for a subscribed topic changes. It is important to handle these events. This helps us keep consumer behavior good and ensures message processing is correct.

To handle consumer rebalance events, we can use the ConsumerRebalanceListener interface. This interface has two methods: onPartitionsRevoked and onPartitionsAssigned. The first method runs before partitions leave the consumer. The second method runs after partitions are given to the consumer. By using these methods, we can manage offsets and state well during rebalances.

Here’s how to create a consumer rebalance listener:

  1. Implement the Rebalance Listener:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;

public class CustomRebalanceListener implements ConsumerRebalanceListener {
    private final Consumer<?, ?> consumer;

    public CustomRebalanceListener(Consumer<?, ?> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Commit offsets or clean up before partitions are revoked
        System.out.println("Revoking partitions: " + partitions);
        consumer.commitSync(); // Commit offsets for the revoked partitions
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Reset state for the new assigned partitions
        System.out.println("Assigning partitions: " + partitions);
        // We can seek to a specific offset if needed
        for (TopicPartition partition : partitions) {
            consumer.seek(partition, getStartOffset(partition)); // Custom method to find start offset
        }
    }

    private long getStartOffset(TopicPartition partition) {
        // Logic to find the starting offset, like latest or earliest
        return consumer.endOffsets(Collections.singleton(partition)).get(partition);
    }
}
  1. Register the Listener with Kafka Consumer:

Next, we need to register this rebalance listener with our KafkaListener setup. We can do this by changing the KafkaListenerContainerFactory.

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);

        factory.getContainerProperties().setConsumerRebalanceListener(new CustomRebalanceListener(factory.createContainer().getConsumer()));

        return factory;
    }
}
  1. Using the Listener in @KafkaListener:

Now, when we use the @KafkaListener annotation, the CustomRebalanceListener will manage rebalance events as we set up.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyKafkaListener {

    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void listen(String message) {
        System.out.println("Received message: " + message);
        // Process message
    }
}

By using the ConsumerRebalanceListener, we make sure our application can handle consumer rebalances well. This keeps message processing correct and allows us to manage topic subscriptions easily. For more understanding of Kafka’s structure and its consumer groups, check Understanding Apache Kafka.

Frequently Asked Questions

1. How can we dynamically add topics to our @KafkaListener at runtime?

To add topics to our @KafkaListener at runtime, we can use the KafkaListenerEndpointRegistry. This helps us manage our listeners. By using the KafkaAdmin class, we can create new topics. After that, we update the listener configuration. This way, we can manage topics without restarting our application. For more details, check our guide on dynamic topic subscription.

2. What is the role of KafkaAdmin in managing topics?

KafkaAdmin is very important for managing Kafka topics in a program way. It helps us create, delete, and change topics easily. This is useful when we want to add new topics to our @KafkaListener at runtime. For a complete overview of Kafka topics and management, see our article on understanding Apache Kafka.

3. How do we handle consumer rebalance events in Kafka?

Handling consumer rebalance events is important for keeping message processing going. We can use the ConsumerRebalanceListener interface. This helps us manage offsets and makes sure messages are not lost when rebalances happen. This is especially important when we change topics for our @KafkaListener. For more insights, look at our article on consumer groups.

4. Can we change the start offset for a Kafka consumer?

Yes, we can change the start offset for a Kafka consumer using the KafkaConsumer API. This lets us choose the offset where we want to start reading messages after we subscribe to a topic. For detailed steps on how to do this, visit our guide on changing start offset for Kafka consumers.

5. What are the best practices for configuring @KafkaListener?

Best practices for configuring @KafkaListener include setting good concurrency levels. We should also manage error handling and make sure everything is thread-safe. Additionally, managing topics dynamically can make our application more flexible. For more information on how to configure consumer settings well, check our article on configuring consumer settings in Kafka.

Comments