RabbitMQ é um message broker open-source que suporta múltiplos protocolos de mensageria. Ele permite que diferentes aplicações se comuniquem através de filas, promovendo desacoplamento e processamento assíncrono entre sistemas.
1. Desacoplamento
Desacoplamento refere-se à separação de componentes distintos de um sistema, permitindo que sejam desenvolvidos e implantados independentemente. RabbitMQ alcança isso utilizando filas de mensagesn: produtores e consumidores não precisam conhecer a existência um do outro diretamente.
2. Melhoria de Performance
RabbitMQ aumenta a capacidade de resposta do sistema através do processamento assíncrono. Produtores podem enviar mensagens para uma fila, enquanto consumidores processam essas mensagens em segundo plano, elevando a performance geral.
3. Suavização de Picos
Suavização de picos equilibra requisições durante alta carga utilizando filas de mensagens. RabbitMQ distribui requisições repentinas ao longo do tempo, prevenindo sobrecarga do sistema.
4. Distribuição
RabbitMQ oferece múltiplos modos de distribuição de mensagens, incluindo ponto a ponto e publish/subscribe. É possível escolher o padrão adequado conforme a necessidade de distribuição das mensagens.
Instalação do RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
Acese http://localhost:15672 com usuário e senha guest.
Exemplo Básico
1. Preparação do ambiente
go get github.com/streadway/amqp
2. Código do produtor
package main
import (
"log"
"strconv"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"fila_tarefas",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
for i := 0; i < 10; i++ {
corpo := "Tarefa " + strconv.Itoa(i)
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(corpo),
})
if err != nil {
log.Fatalf("Falha ao publicar: %s", err)
}
log.Printf("Enviado: %s", corpo)
}
}
3. Código do consumidor
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"fila_tarefas",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
msgs, err := ch.Consume(
q.Name,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
go func() {
for d := range msgs {
log.Printf("Recebido: %s", d.Body)
time.Sleep(2 * time.Second)
log.Printf("Processado: %s", d.Body)
d.Ack(false)
}
}()
log.Println("Aguardando mensagens. Ctrl+C para sair")
select {}
}
Execução do exemplo
- Inicie o RabbitMQ.
- Execute o consumidor.
- Execute o produtor.
Modo Simples: Produtor e Consumidor
Preparação
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp
Produtor
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"fila_simples",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
corpo := "Olá, RabbitMQ!"
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(corpo),
})
if err != nil {
log.Fatalf("Falha ao publicar: %s", err)
}
log.Printf("Enviado: %s", corpo)
}
Consumidor
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"fila_simples",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
go func() {
for d := range msgs {
log.Printf("Recebido: %s", d.Body)
}
}()
log.Println("Aguardando mensagens. Ctrl+C para sair")
select {}
}
Execução
- Inicie o RabbitMQ.
- Execute o consumidor primeiro.
- Execute o produtor.
Modo de Trabalho: Confirmação Manual
Preparação
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp
Produtor
package main
import (
"log"
"strconv"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"fila_trabalho",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
for i := 0; i < 10; i++ {
corpo := "Tarefa " + strconv.Itoa(i)
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(corpo),
})
if err != nil {
log.Fatalf("Falha ao publicar: %s", err)
}
log.Printf("Enviado: %s", corpo)
}
}
Consumidor
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"fila_trabalho",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
msgs, err := ch.Consume(
q.Name,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
go func() {
for d := range msgs {
log.Printf("Recebido: %s", d.Body)
time.Sleep(2 * time.Second)
log.Printf("Processado: %s", d.Body)
d.Ack(false)
}
}()
log.Println("Aguardando mensagens. Ctrl+C para sair")
select {}
}
Execução
- Inicie o RabbitMQ.
- Execute o consumidor.
- Execute o produtor.
Modo Publish/Subscribe (Fanout)
Preparação
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp
Publicador
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"logs",
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar exchange: %s", err)
}
corpo := "Hello, RabbitMQ!"
if len(os.Args) > 1 {
corpo = os.Args[1]
}
err = ch.Publish(
"logs",
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(corpo),
})
if err != nil {
log.Fatalf("Falha ao publicar: %s", err)
}
log.Printf("Enviado: %s", corpo)
}
Consumidor
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"logs",
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar exchange: %s", err)
}
q, err := ch.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
err = ch.QueueBind(
q.Name,
"",
"logs",
false,
nil)
if err != nil {
log.Fatalf("Falha ao vincular: %s", err)
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
go func() {
for d := range msgs {
log.Printf("Recebido: %s", d.Body)
}
}()
log.Println("Aguardando mensagens. Ctrl+C para sair")
select {}
}
Execução
- Inicie o RabbitMQ.
- Execute o consumidor.
- Execute o publicador com argumento opcional.
Modo Roteamento (Direct)
Preparação
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp
Produtor
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"direct_logs",
"direct",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar exchange: %s", err)
}
if len(os.Args) < 3 {
log.Fatalf("Uso: %s <severidade> <mensagem>", os.Args[0])
}
severidade := os.Args[1]
corpo := os.Args[2]
err = ch.Publish(
"direct_logs",
severidade,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(corpo),
})
if err != nil {
log.Fatalf("Falha ao publicar: %s", err)
}
log.Printf("Enviado [%s] %s", severidade, corpo)
}
Consumidor
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"direct_logs",
"direct",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar exchange: %s", err)
}
q, err := ch.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
severidade := "info"
err = ch.QueueBind(
q.Name,
severidade,
"direct_logs",
false,
nil)
if err != nil {
log.Fatalf("Falha ao vincular: %s", err)
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
go func() {
for d := range msgs {
log.Printf("Recebido [%s] %s", d.RoutingKey, d.Body)
}
}()
log.Println("Aguardando mensagens. Ctrl+C para sair")
select {}
}
Execução
- Inicie o RabbitMQ.
- Execute o consumidor.
- Execute o produtor com severidade e mensagem.
Modo Tópico e RPC
Tópico: Produtor
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"topic_logs",
"topic",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar exchange: %s", err)
}
if len(os.Args) < 3 {
log.Fatalf("Uso: %s <routing_key> <mensagem>", os.Args[0])
}
routingKey := os.Args[1]
corpo := os.Args[2]
err = ch.Publish(
"topic_logs",
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(corpo),
})
if err != nil {
log.Fatalf("Falha ao publicar: %s", err)
}
log.Printf("Enviado [%s] %s", routingKey, corpo)
}
Tópico: Consumidor
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
"topic_logs",
"topic",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar exchange: %s", err)
}
q, err := ch.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
bindingKey := "#.info"
err = ch.QueueBind(
q.Name,
bindingKey,
"topic_logs",
false,
nil)
if err != nil {
log.Fatalf("Falha ao vincular: %s", err)
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
go func() {
for d := range msgs {
log.Printf("Recebido [%s] %s", d.RoutingKey, d.Body)
}
}()
log.Println("Aguardando mensagens. Ctrl+C para sair")
select {}
}
Execução do Tópico
- Inicie RabbitMQ.
- Execute consumidor.
- Execute produtor com routing key e mensagem.
RPC: Servidor
package main
import (
"log"
"strconv"
"github.com/streadway/amqp"
)
func fib(n int) int {
if n <= 0 {
return 0
}
if n == 1 {
return 1
}
return fib(n-1) + fib(n-2)
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"rpc_queue",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
log.Println("Aguardando requisições RPC")
for d := range msgs {
n, err := strconv.Atoi(string(d.Body))
if err != nil {
log.Printf("Erro ao converter: %s", err)
continue
}
log.Printf("Calculando fib(%d)", n)
resposta := fib(n)
ch.Publish(
"",
d.ReplyTo,
false,
false,
amqp.Publishing{
CorrelationId: d.CorrelationId,
Body: []byte(strconv.Itoa(resposta)),
})
}
}
RPC: Cliente
package main
import (
"log"
"os"
"strconv"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
replyQueue, err := ch.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila de resposta: %s", err)
}
corrId := ""
msgs, err := ch.Consume(
replyQueue.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
if len(os.Args) < 2 {
log.Fatalf("Uso: %s <n>", os.Args[0])
}
n, err := strconv.Atoi(os.Args[1])
if err != nil {
log.Fatalf("Argumento inválido: %s", os.Args[1])
}
corrId = randomString(32)
err = ch.Publish(
"",
"rpc_queue",
false,
false,
amqp.Publishing{
CorrelationId: corrId,
ReplyTo: replyQueue.Name,
Body: []byte(strconv.Itoa(n)),
})
if err != nil {
log.Fatalf("Falha ao publicar: %s", err)
}
for d := range msgs {
if d.CorrelationId == corrId {
log.Printf("Resposta: %s", d.Body)
break
}
}
}
func randomString(n int) string {
letras := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
b := make([]rune, n)
for i := range b {
b[i] = letras[i%len(letras)]
}
return string(b)
}
Execução do RPC
- Inicie RabbitMQ.
- Execute o servidor RPC.
- Execute o cliente com um número.
Confiabilidade, Persistência e Controle de Fluxo
Para garantir a confiabilidade das mensagens e evitar perda de dados, RabbitMQ oferece:
- Persistência de mensagens: mensagens sobrevivem a reinicializações.
- Limitação do consumidor: controla quantas mensagens o consumidor processa simultaneamente.
- Confirmação do consumidor: mensagem só é removida após processamento bem-sucedido.
- Expiração de mensagens: mensagens com TTL são descartadas automaticamente.
1. Persistência
Produtor com persistência
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"fila_persistente",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
if len(os.Args) < 2 {
log.Fatalf("Uso: %s <mensagem>", os.Args[0])
}
corpo := os.Args[1]
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(corpo),
DeliveryMode: amqp.Persistent,
})
if err != nil {
log.Fatalf("Falha ao publicar: %s", err)
}
log.Printf("Enviado: %s", corpo)
}
2. Limitação do consumidor (QoS)
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
_, err = ch.QueueDeclare(
"fila_persistente",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
err = ch.Qos(
1,
0,
false,
)
if err != nil {
log.Fatalf("Falha ao configurar QoS: %s", err)
}
msgs, err := ch.Consume(
"fila_persistente",
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
go func() {
for d := range msgs {
log.Printf("Recebido: %s", d.Body)
d.Ack(false)
}
}()
log.Println("Aguardando mensagens. Ctrl+C para sair")
select {}
}
3. Confirmação manual
O código acima já utiliza d.Ack(false) para confirmar manualmente.
4. Expiração de mensagens (TTL)
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
_, err = ch.QueueDeclare(
"fila_persistente",
true,
false,
false,
false,
amqp.Table{
"x-message-ttl": 10000,
},
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
msgs, err := ch.Consume(
"fila_persistente",
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
go func() {
for d := range msgs {
log.Printf("Recebido: %s", d.Body)
d.Ack(false)
}
}()
log.Println("Aguardando mensagens. Ctrl+C para sair")
select {}
}
Sistema de Alta Concorrência: Vendas, Reservas, Ingressos
Arquitetura
- Gerenciamento de estoque com banco de dados ou memória.
- RabbitMQ como fila para pedidos.
- Controle de concorrência para evitar vandas excessivas.
- Confirmação manual para garantir processamento.
1. Gerenciamento de estoque
package main
import (
"sync"
)
type Produto struct {
ID string
Quantidade int
}
var estoque = map[string]*Produto{
"produto_1": {ID: "produto_1", Quantidade: 10},
}
var mu sync.Mutex
func reduzirEstoque(id string) bool {
mu.Lock()
defer mu.Unlock()
prod, existe := estoque[id]
if !existe || prod.Quantidade <= 0 {
return false
}
prod.Quantidade--
return true
}
2. Produtor de pedidos
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func enviarPedido(id string) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
_, err = ch.QueueDeclare(
"fila_pedidos",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
err = ch.Publish(
"",
"fila_pedidos",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(id),
})
if err != nil {
log.Fatalf("Falha ao publicar: %s", err)
}
log.Printf("Pedido enviado para %s", id)
}
func main() {
if len(os.Args) < 2 {
log.Fatalf("Uso: %s <id_produto>", os.Args[0])
}
enviarPedido(os.Args[1])
}
3. Consumidor de pedidos
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
_, err = ch.QueueDeclare(
"fila_pedidos",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
msgs, err := ch.Consume(
"fila_pedidos",
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
go func() {
for d := range msgs {
id := string(d.Body)
log.Printf("Processando pedido para %s", id)
if reduzirEstoque(id) {
log.Printf("Compra concluída para %s", id)
d.Ack(false)
} else {
log.Printf("Falha na compra de %s: sem estoque", id)
d.Nack(false, false)
}
}
}()
log.Println("Aguardando pedidos. Ctrl+C para sair")
select {}
}
Integração com Gin + PostgreSQL + RabbitMQ
Configuração do banco de dados
CREATE DATABASE testdb;
\c testdb
CREATE TABLE registros (
id SERIAL PRIMARY KEY,
dados TEXT NOT NULL
);
1. Conexão com banco
package db
import (
"database/sql"
"log"
_ "github.com/lib/pq"
)
var DB *sql.DB
func InitDB() {
var err error
connStr := "user=seuusuario dbname=testdb sslmode=disable"
DB, err = sql.Open("postgres", connStr)
if err != nil {
log.Fatal(err)
}
if err = DB.Ping(); err != nil {
log.Fatal(err)
}
}
2. Conexão com RabbitMQ
package rabbitmq
import (
"log"
"github.com/streadway/amqp"
)
var Channel *amqp.Channel
func InitRabbitMQ() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha ao conectar: %s", err)
}
Channel, err = conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
_, err = Channel.QueueDeclare(
"fila_tarefas",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao declarar fila: %s", err)
}
}
3. Servidor HTTP com Gin
package main
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
"seumodulo/db"
"seumodulo/rabbitmq"
)
type Registro struct {
Dados string `json:"dados"`
}
func main() {
db.InitDB()
rabbitmq.InitRabbitMQ()
r := gin.Default()
r.POST("/registros", func(c *gin.Context) {
var reg Registro
if err := c.ShouldBindJSON(®); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"erro": err.Error()})
return
}
corpo, _ := json.Marshal(reg)
err := rabbitmq.Channel.Publish(
"",
"fila_tarefas",
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: corpo,
DeliveryMode: amqp.Persistent,
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"erro": "Falha ao publicar"})
return
}
c.JSON(http.StatusAccepted, gin.H{"status": "Aceito"})
})
r.Run(":8080")
}
4. Consumidor que salva no PostgreSQL
package main
import (
"encoding/json"
"log"
"github.com/streadway/amqp"
"seumodulo/db"
)
type Registro struct {
Dados string `json:"dados"`
}
func main() {
db.InitDB()
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Falha na conexão: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Falha ao abrir canal: %s", err)
}
defer ch.Close()
msgs, err := ch.Consume(
"fila_tarefas",
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Falha ao registrar consumidor: %s", err)
}
log.Println("Aguardando mensagens. Ctrl+C para sair")
for d := range msgs {
var reg Registro
if err := json.Unmarshal(d.Body, ®); err != nil {
log.Printf("Erro ao decodificar: %s", err)
d.Nack(false, false)
continue
}
_, err := db.DB.Exec("INSERT INTO registros(dados) VALUES($1)", reg.Dados)
if err != nil {
log.Printf("Erro ao inserir: %s", err)
d.Nack(false, false)
continue
}
log.Printf("Inserido: %s", reg.Dados)
d.Ack(false)
}
}
Sistema de Alta Concorrência com Redis e Nginx
Arquitetura
- Balanceamento de carga com Nginx.
- Limitação de taxa com Redis.
- Suavização de picos com RabbitMQ.
- Persistência com PostgreSQL.
Configuração do banco
CREATE DATABASE testdb;
\c testdb
CREATE TABLE registros (
id SERIAL PRIMARY KEY,
dados TEXT NOT NULL
);
Redis: conexão e limitação
package redisdb
import (
"context"
"github.com/go-redis/redis/v8"
)
var ctx = context.Background()
var Rdb *redis.Client
func InitRedis() {
Rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
}
func LimitTaxa(chave string, limite int) bool {
count, err := Rdb.Incr(ctx, chave).Result()
if err != nil {
return false
}
if count == 1 {
Rdb.Expire(ctx, chave, 1)
}
return count <= int64(limite)
}
Consumidor (mesmo código anterior)
// reutiliza o consumidor que insere no PostgreSQL
Servidor HTTP com limitação
package main
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
"seumodulo/db"
"seumodulo/rabbitmq"
"seumodulo/redisdb"
)
type Registro struct {
Dados string `json:"dados"`
}
func main() {
db.InitDB()
redisdb.InitRedis()
rabbitmq.InitRabbitMQ()
r := gin.Default()
r.POST("/registros", func(c *gin.Context) {
var reg Registro
if err := c.ShouldBindJSON(®); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"erro": err.Error()})
return
}
if !redisdb.LimitTaxa("limite_geral", 100) {
c.JSON(http.StatusTooManyRequests, gin.H{"erro": "Muitas requisições"})
return
}
corpo, _ := json.Marshal(reg)
err := rabbitmq.Channel.Publish(
"",
"fila_tarefas",
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: corpo,
DeliveryMode: amqp.Persistent,
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"erro": "Falha ao publicar"})
return
}
c.JSON(http.StatusAccepted, gin.H{"status": "Aceito"})
})
r.Run(":8080")
}
Balanceamento com Nginx
http {
upstream meuapp {
server localhost:8080;
server localhost:8081;
server localhost:8082;
}
server {
listen 80;
location / {
proxy_pass http://meuapp;
}
}
}