Apache Pulsar para Iniciantes: Configuração Básica e Exemplos em Java
Introdução
A comunicação assíncrona via mensageria é um dos pilares das arquiteturas distribuídas, pois desacopla a troca de informações entre serviços de forma confiável. Nesse cenário, o Apache Pulsar vem se destacando por oferecer um ecossistema robusto que une filas de mensagens (message queuing) e streaming de dados em tempo real, com foco em alta performance e escalabilidade.
Neste post, vamos abordar:
-
Uma visão geral do Apache Pulsar e sua comparação com outras ferramentas de mensageria (Kafka, RabbitMQ e ActiveMQ).
-
Como iniciar o Apache Pulsar via Docker de forma simples e rápida.
-
Um exemplo de projeto Java que envia (produz) e recebe (consome) mensagens, ilustrando como trabalhar com threads para lidar com fluxo contínuo e aplicar delay entre envios.
-
Uso de DLQ (Dead Letter Queue) diretamente no exemplo de consumo para tratar mensagens que falharem repetidamente.
Repositório com o código fonte
Os exemplos utilizados neste post podem ser encontrados no repositório GitHub a seguir: https://github.com/gasparbarancelli/demo-apache-pulsar
Comparando Apache Pulsar com Kafka, RabbitMQ e ActiveMQ
Apache Kafka
-
Vantagens: Grande throughput, ecossistema maduro (Kafka Streams, Confluent Platform), logs imutáveis de retenção de dados.
-
Desafios: A escalabilidade de armazenamento e processamento exige atenção a custos e complexidade de configuração.
RabbitMQ
-
Vantagens: Mensageria tradicional (AMQP), fácil integração com microserviços, boa para cenários de filas de tarefas.
-
Desafios: Não é otimizada para streaming contínuo em altíssimo volume ou retenção prolongada das mensagens.
ActiveMQ
-
Vantagens: Utiliza JMS (Java Message Service), muito adotada em aplicações corporativas e com suporte a múltiplos protocolos.
-
Desafios: Para grandes volumes de dados em tempo real, pode demandar ajustes de performance e escalabilidade.
Apache Pulsar
-
Diferenciais:
-
Combina mensageria e streaming nativamente.
-
Escalabilidade elástica, separando a camada de armazenamento (BookKeeper) da camada de processamento (brokers).
-
Multi-tenancy e alta disponibilidade embutidos.
-
Suporta reprocessamento de mensagens, histórico de dados e outras funcionalidades avançadas de streaming.
-
DLQ (Dead Letter Queue) nativo, para onde vão as mensagens que falham repetidamente no processamento.
-
Iniciando o Apache Pulsar via Docker
Uma forma ágil de executar o Pulsar localmente é usar containers Docker. Siga os passos:
-
Baixar a imagem:
docker pull apachepulsar/pulsar:latest
-
Executar em modo standalone:
docker run -it \ -p 6650:6650 \ -p 8080:8080 \ apachepulsar/pulsar:latest \ bin/pulsar standalone
Porta 6650: comunicação binária com os clients. Porta 8080: API REST e painel administrativo.
Pronto! Você terá um cluster standalone em execução localmente, pronto para receber conexões em pulsar://localhost:6650
.
Exemplo de Projeto Java: Consumindo e Produzindo Mensagens
ConsumerDemo.java
package com.gasparbarancelli;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
public class ConsumerDemo extends Thread {
private final PulsarClient client;
private final Consumer<byte[]> consumer;
private volatile boolean running = true;
public ConsumerDemo() throws Exception {
// 1) Criamos um client do Pulsar apontando para pulsar://localhost:6650
client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// 2) Definimos o tópico, o tipo de assinatura (Exclusive) e a política de DLQ
consumer = client.newConsumer()
.topic("gasparbarancelli-demo-pulsar")
.subscriptionName("demo-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.deadLetterPolicy(
DeadLetterPolicy.builder()
.maxRedeliverCount(3) // após 3 tentativas, mensagem vai para DLQ
.deadLetterTopic("dlq-gasparbarancelli-demo-pulsar")
.build()
)
.subscribe();
}
public static void main(String[] args) throws Exception {
// 3) Criamos e iniciamos a thread do consumidor
ConsumerDemo consumerThread = new ConsumerDemo();
consumerThread.start();
// 4) Mantemos a execução por 10 segundos (para demonstração)
Thread.sleep(10000);
// 5) Encerramos a thread de consumo
consumerThread.shutdown();
}
@Override
public void run() {
while (running) {
try {
// 6) Recebe a mensagem do tópico configurado
Message<byte[]> msg = consumer.receive();
String content = new String(msg.getData());
// 7) Exibe o conteúdo recebido no console
System.out.println("Recebido: " + content);
// 8) Aqui, poderíamos testar alguma condição de erro; se falhar:
// consumer.negativeAcknowledge(msg);
// 9) Se processar com sucesso, confirmamos a mensagem
consumer.acknowledge(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void shutdown() {
running = false;
try {
// 10) Fecha o consumer e o cliente
consumer.close();
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
O que está acontecendo aqui?
-
Criação do cliente: conectamos ao broker local (
pulsar://localhost:6650
). -
Assinatura do tópico + DLQ: além do nome do tópico (`gasparbarancelli-demo-pulsar) e do tipo de assinatura (`Exclusive), definimos uma política de DLQ. Se uma mensagem for negatively acknowledged (negativeAcknowledge) mais de 3 vezes, ela será encaminhada para o tópico `dlq-gasparbarancelli-demo-pulsar.
-
Thread em loop: no método
run()
, chamamos `consumer.receive() continuamente para escutar mensagens em tempo real. -
Tratamento de falha: caso ocorra um erro e chamarmos
consumer.negativeAcknowledge(msg), o Pulsar irá reenviar essa mensagem até atingir o `maxRedeliverCount
. Ao estourar esse limite, a mensagem vai para a DLQ. -
Confirmação de recebimento: se tudo der certo no processamento, chamamos
consumer.acknowledge(msg)
para que a mensagem não seja reenviada. -
Encerramento: quando `shutdown() é invocado, fechamos o consumer e o cliente, liberando os recursos.
Por que usar DLQ?
A DLQ evita que mensagens problemáticas (que não podem ser processadas por um motivo específico) fiquem em um loop infinito de tentativas. Assim, você pode tratar mensagens com erro em um fluxo separado, garantindo maior robustez ao sistema.
ProducerDemo.java
package com.gasparbarancelli;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
public class ProducerDemo extends Thread {
private final PulsarClient client;
private final Producer<byte[]> producer;
private volatile boolean running = true;
public ProducerDemo() throws Exception {
// 1) Criamos um client do Pulsar apontando para pulsar://localhost:6650
client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// 2) Criamos o producer apontando para o mesmo tópico do consumidor
producer = client.newProducer()
.topic("gasparbarancelli-demo-pulsar")
.create();
}
public static void main(String[] args) throws Exception {
// 3) Criamos e iniciamos a thread do produtor
ProducerDemo producerThread = new ProducerDemo();
producerThread.start();
// 4) Mantemos a execução por 10 segundos (para demonstração)
Thread.sleep(10000);
// 5) Encerramos a thread de produção
producerThread.shutdown();
}
@Override
public void run() {
int count = 0;
while (running) {
try {
// 6) Montamos a mensagem com um contador
String message = "Mensagem " + count + " com delay de 1s";
// 7) Enviamos a mensagem ao Pulsar
MessageId msgId = producer.send(message.getBytes());
System.out.println("Enviado: " + message + " | ID: " + msgId);
// 8) Incrementamos o contador e aguardamos 1 segundo
count++;
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void shutdown() {
running = false;
try {
// 9) Fecha o producer e o client
producer.close();
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
O que está acontecendo aqui?
-
Criação do cliente e producer: similar ao consumer, mas focado no envio de mensagens.
-
Envio periódico: em run()`, a cada iteração, criamos uma mensagem, a enviamos e aguardamos 1 segundo antes de prosseguir.
-
Exibição de logs: imprimimos no console o ID da mensagem retornado pelo broker e o conteúdo enviado.
-
Encerramento:
shutdown()
fecha as conexões e libera recursos.
Conclusão
O Apache Pulsar é uma solução poderosa que combina mensageria e streaming em uma única plataforma. Sua arquitetura avançada, incluindo suporte nativo a DLQ, o torna ideal para sistemas distribuídos de alta performance. Experimente o Pulsar em seu próximo projeto!