Design and Implementation of End-to-End Message Ordering and Reliability in IM Systems

In Instant Messaging (IM) chat systems, ensuring messages are **not lost, not duplicated, and in the correct order** is one of the most critical and challenging objectives. This article provides a comprehensive, end-to-end solution for **enterprise-grade IM message ordering and reliability**, covering architecture design, protocol mechanisms, data models, and Java engineering implementation.

I. Background and Design Objectives

1. Core Challenges in IM Systems

In real-world network environments, IM systems must contend with:

  • Network jitters, packet loss, and reconnections.
  • Multiple concurrent online endpoints (mobile, PC, web).
  • Message reordering due to distributed services.
  • Inconsistent client and server clock times.
  • Service outages, process restarts, and message replays.

2. Deconstructing Design Objectives

Objective Meaning
No Loss Any message confirmed as sent must eventually be delivered.
No Duplication Retransmissions or replays should not result in multiple deliveries.
No Reordering Messages within a conversation must be displayed to the user in the correct chronological order.
High Availability Service restarts or node failovers should not compromise correctness.
Low Latency Strong consistency should not be achieved at the expense of user experience.

II. Overall Design Philosophy (Conclusion First)

Core Principle: Allow out-of-order arrival but guarantee eventual order; prioritize reliability over strict ordering.

We adopt the following overall strategy:

  • Server-side unified sequence number (Seq) allocation.
  • Clients never trust local timestamps.
  • Messages may arrive out of order, but display must be sequential.
  • Reliability is achieved through ACKs, retries, and persistence.
  • Ordering is managed by Seq and reordering windows.

III. Message Ordering Design (Ordering)

3.1 Globally Unique Message ID (MessageId)

Design Purpose
  • Deduplication
  • Idempotency
  • Trcaeability
  • Uniqueness in distributed environments
Solution

Utilize a **variant of the Snowflake algorithm**:


| Timestamp | Instance ID | Sequence Number |
  • Time-based increment.
  • Centroidless design.
  • Supports high concurrency.
Global Unique ID Generator (Snowflake Variant)

@Component
public class MessageIdGenerator {
    // Epoch start time (e.g., January 1, 2024)
    private static final long START_EPOCH = 1704067200000L;

    // Bit allocation for each part
    private static final int SEQUENCE_BITS = 12;
    private static final int INSTANCE_ID_BITS = 10;
    private static final long MAX_SEQUENCE = (1L << SEQUENCE_BITS) - 1;
    private static final long MAX_INSTANCE_ID = (1L << INSTANCE_ID_BITS) - 1;

    // Bit shifts
    private static final int TIMESTAMP_SHIFT = SEQUENCE_BITS + INSTANCE_ID_BITS;
    private static final int INSTANCE_ID_SHIFT = SEQUENCE_BITS;

    private final long instanceId;
    private long lastTimestamp = -1L;
    private long currentSequence = 0L;

    public MessageIdGenerator(@Value("${im.server.instance-id:0}") long instanceId) {
        if (instanceId & ~MAX_INSTANCE_ID) { // Check if instanceId fits within the allocated bits
            throw new IllegalArgumentException("Instance ID is out of the allowed range.");
        }
        this.instanceId = instanceId;
    }

    public synchronized long generateId() {
        long currentTimestamp = getCurrentTimestamp();

        // Handle clock drift: If current time is before last timestamp, throw an error.
        if (currentTimestamp < lastTimestamp) {
            throw new RuntimeException("Clock moved backwards. Cannot generate ID.");
        }

        // If timestamps are the same, increment sequence number.
        if (currentTimestamp == lastTimestamp) {
            currentSequence = (currentSequence + 1) & MAX_SEQUENCE;
            // If sequence number rolls over, wait for the next millisecond.
            if (currentSequence == 0) {
                currentTimestamp = waitNextMillisecond(lastTimestamp);
            }
        } else {
            // Reset sequence for a new millisecond.
            currentSequence = 0L;
        }

        lastTimestamp = currentTimestamp;

        // Construct the unique ID: (timestamp - epoch) | instanceId | sequence
        return ((currentTimestamp - START_EPOCH) << TIMESTAMP_SHIFT)
                | (instanceId << INSTANCE_ID_SHIFT)
                | currentSequence;
    }

    // Helper method to parse the components of a generated ID
    public static IdComponents parseId(long id) {
        return new IdComponents(
                (id >>> TIMESTAMP_SHIFT) + START_EPOCH,
                (id >>> INSTANCE_ID_SHIFT) & MAX_INSTANCE_ID,
                id & MAX_SEQUENCE
        );
    }

    private long waitNextMillisecond(long lastTimestamp) {
        long timestamp = getCurrentTimestamp();
        while (timestamp <= lastTimestamp) {
            timestamp = getCurrentTimestamp();
        }
        return timestamp;
    }

    private long getCurrentTimestamp() {
        return System.currentTimeMillis();
    }

    @Value
    @AllArgsConstructor
    public static class IdComponents {
        private final long timestamp;
        private final long instanceId;
        private final long sequence;
    }
}

3.2 Conversation-Level Sequence Number (Session Seq)

Why is Session Seq Still Needed?

MessageId guarantees "global uniqueness" but **cannot guarantee order within a conversation**.

IM ordering requirements are:

  • Strict ordering within a one-on-one or group chat.
  • No ordering constraints between different conversations.
Solution
  • Each sessionId maintains an independent, incrementing sequence.
  • Utilize **Redis INCR atomic operations**.
  • Server-side allocation only.
Conversation Sequence Number Generator

@Service
public class ConversationSequenceService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static final String SEQ_KEY_PREFIX = "im:conv:seq:";
    // Use a large number close to Long.MAX_VALUE to avoid frequent wrap-around
    private static final long MAX_SEQUENCE_VALUE = Long.MAX_VALUE - 1000000;

    /**
     * Generates an incrementing sequence number for a conversation using atomic Redis operations.
     */
    public long generateNextSequence(String conversationId) {
        String sequenceKey = SEQ_KEY_PREFIX + conversationId;

        // Atomically increment the sequence number in Redis
        Long sequence = redisTemplate.opsForValue().increment(sequenceKey);

        if (sequence == null) {
            throw new RuntimeException("Failed to generate sequence number from Redis.");
        }

        // Handle sequence number overflow (highly unlikely in practice)
        if (sequence >= MAX_SEQUENCE_VALUE) {
            // Reset sequence for this conversation and log for potential historical sync needs.
            // A more robust solution might involve a secondary sequence or epoch.
            redisTemplate.opsForValue().set(sequenceKey, "0"); // Resetting
            // Consider logging this event or implementing a more sophisticated overflow strategy.
            return 0L; // Return the reset value
        }
        return sequence;
    }

    // Method to potentially reset or initialize sequence for a conversation
    public void initializeSequence(String conversationId, long initialValue) {
        String sequenceKey = SEQ_KEY_PREFIX + conversationId;
        // Use SETNX or check existence before setting to avoid overwriting active sequences
        redisTemplate.opsForValue().setIfAbsent(sequenceKey, String.valueOf(initialValue));
    }
}

Tags: IM Message Queue Distributed Systems java Redis

Publicado em 6-26 00:13