Fundamentos de Middleware: RabbitMQ na Prática

RabbitMQ é um message broker open-source que suporta múltiplos protocolos de mensageria. Ele permite que diferentes aplicações se comuniquem através de filas, promovendo desacoplamento e processamento assíncrono entre sistemas.

1. Desacoplamento

Desacoplamento refere-se à separação de componentes distintos de um sistema, permitindo que sejam desenvolvidos e implantados independentemente. RabbitMQ alcança isso utilizando filas de mensagesn: produtores e consumidores não precisam conhecer a existência um do outro diretamente.

2. Melhoria de Performance

RabbitMQ aumenta a capacidade de resposta do sistema através do processamento assíncrono. Produtores podem enviar mensagens para uma fila, enquanto consumidores processam essas mensagens em segundo plano, elevando a performance geral.

3. Suavização de Picos

Suavização de picos equilibra requisições durante alta carga utilizando filas de mensagens. RabbitMQ distribui requisições repentinas ao longo do tempo, prevenindo sobrecarga do sistema.

4. Distribuição

RabbitMQ oferece múltiplos modos de distribuição de mensagens, incluindo ponto a ponto e publish/subscribe. É possível escolher o padrão adequado conforme a necessidade de distribuição das mensagens.

Instalação do RabbitMQ

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

Acese http://localhost:15672 com usuário e senha guest.

Exemplo Básico

1. Preparação do ambiente

go get github.com/streadway/amqp

2. Código do produtor

package main

import (
    "log"
    "strconv"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "fila_tarefas",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    for i := 0; i < 10; i++ {
        corpo := "Tarefa " + strconv.Itoa(i)
        err = ch.Publish(
            "",
            q.Name,
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(corpo),
            })
        if err != nil {
            log.Fatalf("Falha ao publicar: %s", err)
        }
        log.Printf("Enviado: %s", corpo)
    }
}

3. Código do consumidor

package main

import (
    "log"
    "time"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "fila_tarefas",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    msgs, err := ch.Consume(
        q.Name,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    go func() {
        for d := range msgs {
            log.Printf("Recebido: %s", d.Body)
            time.Sleep(2 * time.Second)
            log.Printf("Processado: %s", d.Body)
            d.Ack(false)
        }
    }()

    log.Println("Aguardando mensagens. Ctrl+C para sair")
    select {}
}

Execução do exemplo

  1. Inicie o RabbitMQ.
  2. Execute o consumidor.
  3. Execute o produtor.

Modo Simples: Produtor e Consumidor

Preparação

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp

Produtor

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "fila_simples",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    corpo := "Olá, RabbitMQ!"
    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(corpo),
        })
    if err != nil {
        log.Fatalf("Falha ao publicar: %s", err)
    }

    log.Printf("Enviado: %s", corpo)
}

Consumidor

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "fila_simples",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    go func() {
        for d := range msgs {
            log.Printf("Recebido: %s", d.Body)
        }
    }()

    log.Println("Aguardando mensagens. Ctrl+C para sair")
    select {}
}

Execução

  1. Inicie o RabbitMQ.
  2. Execute o consumidor primeiro.
  3. Execute o produtor.

Modo de Trabalho: Confirmação Manual

Preparação

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp

Produtor

package main

import (
    "log"
    "strconv"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "fila_trabalho",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    for i := 0; i < 10; i++ {
        corpo := "Tarefa " + strconv.Itoa(i)
        err = ch.Publish(
            "",
            q.Name,
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(corpo),
            })
        if err != nil {
            log.Fatalf("Falha ao publicar: %s", err)
        }
        log.Printf("Enviado: %s", corpo)
    }
}

Consumidor

package main

import (
    "log"
    "time"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "fila_trabalho",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    msgs, err := ch.Consume(
        q.Name,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    go func() {
        for d := range msgs {
            log.Printf("Recebido: %s", d.Body)
            time.Sleep(2 * time.Second)
            log.Printf("Processado: %s", d.Body)
            d.Ack(false)
        }
    }()

    log.Println("Aguardando mensagens. Ctrl+C para sair")
    select {}
}

Execução

  1. Inicie o RabbitMQ.
  2. Execute o consumidor.
  3. Execute o produtor.

Modo Publish/Subscribe (Fanout)

Preparação

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp

Publicador

package main

import (
    "log"
    "os"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",
        "fanout",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar exchange: %s", err)
    }

    corpo := "Hello, RabbitMQ!"
    if len(os.Args) > 1 {
        corpo = os.Args[1]
    }

    err = ch.Publish(
        "logs",
        "",
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(corpo),
        })
    if err != nil {
        log.Fatalf("Falha ao publicar: %s", err)
    }

    log.Printf("Enviado: %s", corpo)
}

Consumidor

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",
        "fanout",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar exchange: %s", err)
    }

    q, err := ch.QueueDeclare(
        "",
        false,
        false,
        true,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    err = ch.QueueBind(
        q.Name,
        "",
        "logs",
        false,
        nil)
    if err != nil {
        log.Fatalf("Falha ao vincular: %s", err)
    }

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    go func() {
        for d := range msgs {
            log.Printf("Recebido: %s", d.Body)
        }
    }()

    log.Println("Aguardando mensagens. Ctrl+C para sair")
    select {}
}

Execução

  1. Inicie o RabbitMQ.
  2. Execute o consumidor.
  3. Execute o publicador com argumento opcional.

Modo Roteamento (Direct)

Preparação

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp

Produtor

package main

import (
    "log"
    "os"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "direct_logs",
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar exchange: %s", err)
    }

    if len(os.Args) < 3 {
        log.Fatalf("Uso: %s <severidade> <mensagem>", os.Args[0])
    }
    severidade := os.Args[1]
    corpo := os.Args[2]

    err = ch.Publish(
        "direct_logs",
        severidade,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(corpo),
        })
    if err != nil {
        log.Fatalf("Falha ao publicar: %s", err)
    }

    log.Printf("Enviado [%s] %s", severidade, corpo)
}

Consumidor

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "direct_logs",
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar exchange: %s", err)
    }

    q, err := ch.QueueDeclare(
        "",
        false,
        false,
        true,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    severidade := "info"
    err = ch.QueueBind(
        q.Name,
        severidade,
        "direct_logs",
        false,
        nil)
    if err != nil {
        log.Fatalf("Falha ao vincular: %s", err)
    }

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    go func() {
        for d := range msgs {
            log.Printf("Recebido [%s] %s", d.RoutingKey, d.Body)
        }
    }()

    log.Println("Aguardando mensagens. Ctrl+C para sair")
    select {}
}

Execução

  1. Inicie o RabbitMQ.
  2. Execute o consumidor.
  3. Execute o produtor com severidade e mensagem.

Modo Tópico e RPC

Tópico: Produtor

package main

import (
    "log"
    "os"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "topic_logs",
        "topic",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar exchange: %s", err)
    }

    if len(os.Args) < 3 {
        log.Fatalf("Uso: %s <routing_key> <mensagem>", os.Args[0])
    }
    routingKey := os.Args[1]
    corpo := os.Args[2]

    err = ch.Publish(
        "topic_logs",
        routingKey,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(corpo),
        })
    if err != nil {
        log.Fatalf("Falha ao publicar: %s", err)
    }

    log.Printf("Enviado [%s] %s", routingKey, corpo)
}

Tópico: Consumidor

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "topic_logs",
        "topic",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar exchange: %s", err)
    }

    q, err := ch.QueueDeclare(
        "",
        false,
        false,
        true,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    bindingKey := "#.info"
    err = ch.QueueBind(
        q.Name,
        bindingKey,
        "topic_logs",
        false,
        nil)
    if err != nil {
        log.Fatalf("Falha ao vincular: %s", err)
    }

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    go func() {
        for d := range msgs {
            log.Printf("Recebido [%s] %s", d.RoutingKey, d.Body)
        }
    }()

    log.Println("Aguardando mensagens. Ctrl+C para sair")
    select {}
}

Execução do Tópico

  1. Inicie RabbitMQ.
  2. Execute consumidor.
  3. Execute produtor com routing key e mensagem.

RPC: Servidor

package main

import (
    "log"
    "strconv"
    "github.com/streadway/amqp"
)

func fib(n int) int {
    if n <= 0 {
        return 0
    }
    if n == 1 {
        return 1
    }
    return fib(n-1) + fib(n-2)
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "rpc_queue",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    log.Println("Aguardando requisições RPC")

    for d := range msgs {
        n, err := strconv.Atoi(string(d.Body))
        if err != nil {
            log.Printf("Erro ao converter: %s", err)
            continue
        }
        log.Printf("Calculando fib(%d)", n)
        resposta := fib(n)

        ch.Publish(
            "",
            d.ReplyTo,
            false,
            false,
            amqp.Publishing{
                CorrelationId: d.CorrelationId,
                Body:          []byte(strconv.Itoa(resposta)),
            })
    }
}

RPC: Cliente

package main

import (
    "log"
    "os"
    "strconv"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    replyQueue, err := ch.QueueDeclare(
        "",
        false,
        false,
        true,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila de resposta: %s", err)
    }

    corrId := ""
    msgs, err := ch.Consume(
        replyQueue.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    if len(os.Args) < 2 {
        log.Fatalf("Uso: %s <n>", os.Args[0])
    }
    n, err := strconv.Atoi(os.Args[1])
    if err != nil {
        log.Fatalf("Argumento inválido: %s", os.Args[1])
    }

    corrId = randomString(32)
    err = ch.Publish(
        "",
        "rpc_queue",
        false,
        false,
        amqp.Publishing{
            CorrelationId: corrId,
            ReplyTo:       replyQueue.Name,
            Body:          []byte(strconv.Itoa(n)),
        })
    if err != nil {
        log.Fatalf("Falha ao publicar: %s", err)
    }

    for d := range msgs {
        if d.CorrelationId == corrId {
            log.Printf("Resposta: %s", d.Body)
            break
        }
    }
}

func randomString(n int) string {
    letras := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
    b := make([]rune, n)
    for i := range b {
        b[i] = letras[i%len(letras)]
    }
    return string(b)
}

Execução do RPC

  1. Inicie RabbitMQ.
  2. Execute o servidor RPC.
  3. Execute o cliente com um número.

Confiabilidade, Persistência e Controle de Fluxo

Para garantir a confiabilidade das mensagens e evitar perda de dados, RabbitMQ oferece:

  • Persistência de mensagens: mensagens sobrevivem a reinicializações.
  • Limitação do consumidor: controla quantas mensagens o consumidor processa simultaneamente.
  • Confirmação do consumidor: mensagem só é removida após processamento bem-sucedido.
  • Expiração de mensagens: mensagens com TTL são descartadas automaticamente.

1. Persistência

Produtor com persistência

package main

import (
    "log"
    "os"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "fila_persistente",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    if len(os.Args) < 2 {
        log.Fatalf("Uso: %s <mensagem>", os.Args[0])
    }
    corpo := os.Args[1]

    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType:  "text/plain",
            Body:         []byte(corpo),
            DeliveryMode: amqp.Persistent,
        })
    if err != nil {
        log.Fatalf("Falha ao publicar: %s", err)
    }

    log.Printf("Enviado: %s", corpo)
}

2. Limitação do consumidor (QoS)

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    _, err = ch.QueueDeclare(
        "fila_persistente",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    err = ch.Qos(
        1,
        0,
        false,
    )
    if err != nil {
        log.Fatalf("Falha ao configurar QoS: %s", err)
    }

    msgs, err := ch.Consume(
        "fila_persistente",
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    go func() {
        for d := range msgs {
            log.Printf("Recebido: %s", d.Body)
            d.Ack(false)
        }
    }()

    log.Println("Aguardando mensagens. Ctrl+C para sair")
    select {}
}

3. Confirmação manual

O código acima já utiliza d.Ack(false) para confirmar manualmente.

4. Expiração de mensagens (TTL)

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    _, err = ch.QueueDeclare(
        "fila_persistente",
        true,
        false,
        false,
        false,
        amqp.Table{
            "x-message-ttl": 10000,
        },
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    msgs, err := ch.Consume(
        "fila_persistente",
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    go func() {
        for d := range msgs {
            log.Printf("Recebido: %s", d.Body)
            d.Ack(false)
        }
    }()

    log.Println("Aguardando mensagens. Ctrl+C para sair")
    select {}
}

Sistema de Alta Concorrência: Vendas, Reservas, Ingressos

Arquitetura

  1. Gerenciamento de estoque com banco de dados ou memória.
  2. RabbitMQ como fila para pedidos.
  3. Controle de concorrência para evitar vandas excessivas.
  4. Confirmação manual para garantir processamento.

1. Gerenciamento de estoque

package main

import (
    "sync"
)

type Produto struct {
    ID       string
    Quantidade int
}

var estoque = map[string]*Produto{
    "produto_1": {ID: "produto_1", Quantidade: 10},
}

var mu sync.Mutex

func reduzirEstoque(id string) bool {
    mu.Lock()
    defer mu.Unlock()

    prod, existe := estoque[id]
    if !existe || prod.Quantidade <= 0 {
        return false
    }

    prod.Quantidade--
    return true
}

2. Produtor de pedidos

package main

import (
    "log"
    "os"
    "github.com/streadway/amqp"
)

func enviarPedido(id string) {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    _, err = ch.QueueDeclare(
        "fila_pedidos",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    err = ch.Publish(
        "",
        "fila_pedidos",
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(id),
        })
    if err != nil {
        log.Fatalf("Falha ao publicar: %s", err)
    }

    log.Printf("Pedido enviado para %s", id)
}

func main() {
    if len(os.Args) < 2 {
        log.Fatalf("Uso: %s <id_produto>", os.Args[0])
    }
    enviarPedido(os.Args[1])
}

3. Consumidor de pedidos

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    _, err = ch.QueueDeclare(
        "fila_pedidos",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }

    msgs, err := ch.Consume(
        "fila_pedidos",
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    go func() {
        for d := range msgs {
            id := string(d.Body)
            log.Printf("Processando pedido para %s", id)

            if reduzirEstoque(id) {
                log.Printf("Compra concluída para %s", id)
                d.Ack(false)
            } else {
                log.Printf("Falha na compra de %s: sem estoque", id)
                d.Nack(false, false)
            }
        }
    }()

    log.Println("Aguardando pedidos. Ctrl+C para sair")
    select {}
}

Integração com Gin + PostgreSQL + RabbitMQ

Configuração do banco de dados

CREATE DATABASE testdb;
\c testdb
CREATE TABLE registros (
    id SERIAL PRIMARY KEY,
    dados TEXT NOT NULL
);

1. Conexão com banco

package db

import (
    "database/sql"
    "log"
    _ "github.com/lib/pq"
)

var DB *sql.DB

func InitDB() {
    var err error
    connStr := "user=seuusuario dbname=testdb sslmode=disable"
    DB, err = sql.Open("postgres", connStr)
    if err != nil {
        log.Fatal(err)
    }

    if err = DB.Ping(); err != nil {
        log.Fatal(err)
    }
}

2. Conexão com RabbitMQ

package rabbitmq

import (
    "log"
    "github.com/streadway/amqp"
)

var Channel *amqp.Channel

func InitRabbitMQ() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha ao conectar: %s", err)
    }

    Channel, err = conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }

    _, err = Channel.QueueDeclare(
        "fila_tarefas",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao declarar fila: %s", err)
    }
}

3. Servidor HTTP com Gin

package main

import (
    "encoding/json"
    "net/http"
    "github.com/gin-gonic/gin"
    "seumodulo/db"
    "seumodulo/rabbitmq"
)

type Registro struct {
    Dados string `json:"dados"`
}

func main() {
    db.InitDB()
    rabbitmq.InitRabbitMQ()

    r := gin.Default()

    r.POST("/registros", func(c *gin.Context) {
        var reg Registro
        if err := c.ShouldBindJSON(&reg); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"erro": err.Error()})
            return
        }

        corpo, _ := json.Marshal(reg)
        err := rabbitmq.Channel.Publish(
            "",
            "fila_tarefas",
            false,
            false,
            amqp.Publishing{
                ContentType: "application/json",
                Body:        corpo,
                DeliveryMode: amqp.Persistent,
            })
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"erro": "Falha ao publicar"})
            return
        }

        c.JSON(http.StatusAccepted, gin.H{"status": "Aceito"})
    })

    r.Run(":8080")
}

4. Consumidor que salva no PostgreSQL

package main

import (
    "encoding/json"
    "log"
    "github.com/streadway/amqp"
    "seumodulo/db"
)

type Registro struct {
    Dados string `json:"dados"`
}

func main() {
    db.InitDB()

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Falha na conexão: %s", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Falha ao abrir canal: %s", err)
    }
    defer ch.Close()

    msgs, err := ch.Consume(
        "fila_tarefas",
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Falha ao registrar consumidor: %s", err)
    }

    log.Println("Aguardando mensagens. Ctrl+C para sair")

    for d := range msgs {
        var reg Registro
        if err := json.Unmarshal(d.Body, &reg); err != nil {
            log.Printf("Erro ao decodificar: %s", err)
            d.Nack(false, false)
            continue
        }

        _, err := db.DB.Exec("INSERT INTO registros(dados) VALUES($1)", reg.Dados)
        if err != nil {
            log.Printf("Erro ao inserir: %s", err)
            d.Nack(false, false)
            continue
        }

        log.Printf("Inserido: %s", reg.Dados)
        d.Ack(false)
    }
}

Sistema de Alta Concorrência com Redis e Nginx

Arquitetura

  1. Balanceamento de carga com Nginx.
  2. Limitação de taxa com Redis.
  3. Suavização de picos com RabbitMQ.
  4. Persistência com PostgreSQL.

Configuração do banco

CREATE DATABASE testdb;
\c testdb
CREATE TABLE registros (
    id SERIAL PRIMARY KEY,
    dados TEXT NOT NULL
);

Redis: conexão e limitação

package redisdb

import (
    "context"
    "github.com/go-redis/redis/v8"
)

var ctx = context.Background()
var Rdb *redis.Client

func InitRedis() {
    Rdb = redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
}

func LimitTaxa(chave string, limite int) bool {
    count, err := Rdb.Incr(ctx, chave).Result()
    if err != nil {
        return false
    }

    if count == 1 {
        Rdb.Expire(ctx, chave, 1)
    }

    return count <= int64(limite)
}

Consumidor (mesmo código anterior)

// reutiliza o consumidor que insere no PostgreSQL

Servidor HTTP com limitação

package main

import (
    "encoding/json"
    "net/http"
    "github.com/gin-gonic/gin"
    "seumodulo/db"
    "seumodulo/rabbitmq"
    "seumodulo/redisdb"
)

type Registro struct {
    Dados string `json:"dados"`
}

func main() {
    db.InitDB()
    redisdb.InitRedis()
    rabbitmq.InitRabbitMQ()

    r := gin.Default()

    r.POST("/registros", func(c *gin.Context) {
        var reg Registro
        if err := c.ShouldBindJSON(&reg); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"erro": err.Error()})
            return
        }

        if !redisdb.LimitTaxa("limite_geral", 100) {
            c.JSON(http.StatusTooManyRequests, gin.H{"erro": "Muitas requisições"})
            return
        }

        corpo, _ := json.Marshal(reg)
        err := rabbitmq.Channel.Publish(
            "",
            "fila_tarefas",
            false,
            false,
            amqp.Publishing{
                ContentType:  "application/json",
                Body:         corpo,
                DeliveryMode: amqp.Persistent,
            })
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"erro": "Falha ao publicar"})
            return
        }

        c.JSON(http.StatusAccepted, gin.H{"status": "Aceito"})
    })

    r.Run(":8080")
}

Balanceamento com Nginx

http {
    upstream meuapp {
        server localhost:8080;
        server localhost:8081;
        server localhost:8082;
    }

    server {
        listen 80;

        location / {
            proxy_pass http://meuapp;
        }
    }
}

Tags: RabbitMQ go AMQP filas de mensagens microserviços

Publicado em 6-27 22:49