Apache Pulsar and Spring Boot | wranto

0

 Hello everyone today I am going to talk about Apache pulsar

Are you thinking of some kind of motorbike, then this is not what we are going to discuss today.

What is Apache Pulsar?

Apache Pulsar, a next-generation messaging system, offers a powerful and scalable solution for building real-time applications. So what does it offer?

  • Apache Pulsar can be used as messaging or streaming platform with less than 10ms latency.
  • Multi-tenancy support
  • Cross region geo-replication
  • Apache Pulsar can support upto 1 million unique topics.

 Pulsar Architecture
  

When combined with Spring Boot, a popular Java framework, Apache Pulsar becomes even more accessible and developer-friendly. 

In this blog post, we will explore how to integrate Apache Pulsar with Spring Boot and leverage its capabilities to build robust and scalable message-driven applications.

1. Understanding Apache Pulsar:

Apache Pulsar is a distributed pub-sub messaging system that provides seamless scalability and low latency. It offers a highly available architecture with the ability to handle millions of messages per second. 
Apache Pulsar provides features like message durability, automatic partitioning, and multi-tenancy, making it an ideal choice for building event-driven and real-time applications.

2. Setting up Apache pulsar cluster locally:

I will be using docker to setup the apache pulsar cluster locally.
docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:3.0.0 bin/pulsar standalone
If you want to modify some of the default configuration options of apache pulsar then use below command to change it.
  docker run -it -e PULSAR_PREFIX_xxx=yyy -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.10.0 sh -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"
  
The list of configurations options is at Pulsar config options

3. Setting up Apache Pulsar with Spring Boot:

To integrate Apache Pulsar with Spring Boot, we can leverage the "spring-pulsar-spring-boot-starter" library. This library provides a set of auto-configurations and utilities that simplify the integration process. Start by including the necessary Maven or Gradle dependencies in your Spring Boot project.

Maven dependency:

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-boot-starter</artifactId>
        <version>0.2.1-SNAPSHOT</version>
    </dependency>
</dependencies>
  
Project structure

3. Producing Messages with Apache Pulsar and Spring Boot:

With the Apache Pulsar integration in place, you can now start producing messages from your Spring Boot application. Use the `PulsarTemplate` class provided by the starter library to send messages to a Apache Pulsar topic.

PulsarProducer:

import com.wranto.pulsar.model.CustomMessage;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class MessageProducer {

    @NonNull private PulsarTemplate<CustomMessage> pulsarTemplate;  
    public void sendMessage(CustomMessage message) {
        try {
            pulsarTemplate.send("hello-pulsar-topic", message);
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }
}

CustomMessage.java:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CustomMessage {
    private int id;
    private String body;
    private long createdAt;
}
  

4. Consuming Messages with Apache Pulsar and Spring Boot:

Consuming messages from Apache Pulsar topics in a Spring Boot application is straightforward. Create a message listener by implementing the `MessageListener` interface or using the `@PulsarConsumer `annotation. The listener will be automatically registered by the Pulsar integration.

Since I am using java object as a message , I preferred to use SchemaType.JSON

PulsarConsumer:

import com.wranto.pulsar.model.CustomMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class MessageConsumer {

    @PulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic", schemaType = SchemaType.JSON)
    public void consumeMessage(CustomMessage message) {
        log.info("Message Received: id = {}, body={}, createdAt={}  ",  message.getId(), message.getBody(), message.getCreatedAt() );
    }
}

SpringBootPulsarApplication.java:

import com.wranto.pulsar.model.CustomMessage;
import com.wranto.pulsar.producer.MessageProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@Slf4j
@SpringBootApplication
@RequiredArgsConstructor
public class SpringBootPulsarApplication implements CommandLineRunner {

	@Autowired
	 private MessageProducer messageProducer;

	public static void main(String[] args) {
		SpringApplication.run(SpringBootPulsarApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		log.info("Start sending message using apache pulsar");
		messageProducer.sendMessage(new CustomMessage(1, "Hello Pulsar", System.currentTimeMillis()));
	}
}
After running the class you can see the message received in the Pulsar consumer and printed on console.
2023-06-10T19:44:48.467+02:00  INFO 24296 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl      : [hello-pulsar-topic] [standalone-12-0] Created producer on cnx [id: 0x2e2e2e25, L:/127.0.0.1:49360 - R:localhost/127.0.0.1:6650]
2023-06-10T19:44:48.665+02:00  INFO 24296 --- [ntainer#0-0-C-1] c.w.pulsar.consumer.MessageConsumer      : Message Received: id = 1, body=Hello Pulsar, createdAt=1686419088377  

Conclusion:

 Today we explored the powerful capabilities of Apache Pulsar and its easy integration with Spring Boot for building robust and scalable applications.

Post a Comment

0Comments

Please Select Embedded Mode To show the Comment System.*

Cookies Consent

This website uses cookies to offer you a better Browsing Experience. By using our website, You agree to the use of Cookies

Privacy Policy