New protocol version released: This page may contain outdated information.
Optimize your HAIP applications with intelligent flow control mechanisms. The SDK’s credit-based system prevents message loss, manages back-pressure, and ensures optimal performance under varying load conditions.

Flow Control Overview

What is Flow Control?

Flow control is a mechanism that regulates the rate of message transmission between client and server to:
  • Prevent message loss due to buffer overflow
  • Ensure fair resource allocation
  • Maintain system stability under load
  • Provide back-pressure feedback

Credit-Based System

HAIP uses a credit-based flow control system where:
  • Message Credits: Control how many messages can be sent
  • Byte Credits: Control how much data can be sent
  • Channel-Specific: Each channel has independent credit pools
  • Dynamic Updates: Credits are replenished based on server capacity

Flow Control Configuration

Basic Configuration

import { createHAIPClient } from "haip-sdk";

const client = createHAIPClient({
  url: "ws://localhost:8080",
  token: "your-jwt-token",
  transport: "websocket",
  flowControl: {
    initialCredits: 10, // Initial message credits per channel
    initialCreditBytes: 1024 * 1024, // Initial byte credits (1MB)
    maxCredits: 100, // Maximum credits per channel
    maxCreditBytes: 10 * 1024 * 1024, // Maximum byte credits (10MB)
  },
});

Advanced Configuration

const client = createHAIPClient({
  url: "ws://localhost:8080",
  token: "your-jwt-token",
  transport: "websocket",
  flowControl: {
    initialCredits: {
      USER: 20,
      AGENT: 10,
      SYSTEM: 5,
    },
    initialCreditBytes: {
      USER: 2 * 1024 * 1024, // 2MB for user messages
      AGENT: 5 * 1024 * 1024, // 5MB for agent responses
      SYSTEM: 1024 * 1024, // 1MB for system messages
    },
    maxCredits: {
      USER: 50,
      AGENT: 25,
      SYSTEM: 10,
    },
    maxCreditBytes: {
      USER: 10 * 1024 * 1024,
      AGENT: 20 * 1024 * 1024,
      SYSTEM: 5 * 1024 * 1024,
    },
  },
});

Flow Control State

Checking Current State

// Get current flow control state
const state = client.getConnectionState();

console.log("Flow control state:", {
  userCredits: state.credits.get("USER"),
  agentCredits: state.credits.get("AGENT"),
  systemCredits: state.credits.get("SYSTEM"),
  userBytes: state.byteCredits.get("USER"),
  agentBytes: state.byteCredits.get("AGENT"),
  systemBytes: state.byteCredits.get("SYSTEM"),
});

Monitoring Flow Control

class FlowControlMonitor {
  constructor(private client: any) {
    this.setupMonitoring();
  }

  private setupMonitoring() {
    // Monitor flow control updates
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "FLOW_UPDATE") {
        this.handleFlowUpdate(message);
      }
    });

    // Periodic state check
    setInterval(() => {
      this.logFlowState();
    }, 5000);
  }

  private handleFlowUpdate(message: HAIPMessage) {
    const { channel, add_messages, add_bytes } = message.payload;
    console.log(`Flow update for ${channel}:`, {
      messageCredits: add_messages,
      byteCredits: add_bytes,
    });
  }

  private logFlowState() {
    const state = this.client.getConnectionState();
    console.log("Current flow state:", {
      user: {
        messages: state.credits.get("USER"),
        bytes: state.byteCredits.get("USER"),
      },
      agent: {
        messages: state.credits.get("AGENT"),
        bytes: state.byteCredits.get("AGENT"),
      },
      system: {
        messages: state.credits.get("SYSTEM"),
        bytes: state.byteCredits.get("SYSTEM"),
      },
    });
  }
}

// Usage
const monitor = new FlowControlMonitor(client);

Sending Flow Updates

Requesting More Credits

// Request more message credits
await client.sendFlowUpdate("USER", 10); // Request 10 more message credits

// Request more byte credits
await client.sendFlowUpdate("USER", 0, 1024 * 1024); // Request 1MB more byte credits

// Request both
await client.sendFlowUpdate("USER", 10, 1024 * 1024); // Request both message and byte credits

Automatic Credit Management

class AutomaticFlowControl {
  private thresholds = {
    messageThreshold: 5, // Request more when below 5 message credits
    byteThreshold: 512 * 1024, // Request more when below 512KB byte credits
  };

  constructor(private client: any) {
    this.setupAutomaticControl();
  }

  private setupAutomaticControl() {
    // Check flow control before sending messages
    const originalSendTextMessage = this.client.sendTextMessage.bind(
      this.client
    );

    this.client.sendTextMessage = async (
      channel: string,
      text: string,
      author?: string,
      runId?: string
    ) => {
      await this.ensureCredits(channel);
      return originalSendTextMessage(channel, text, author, runId);
    };
  }

  private async ensureCredits(channel: string) {
    const state = this.client.getConnectionState();
    const messageCredits = state.credits.get(channel) || 0;
    const byteCredits = state.byteCredits.get(channel) || 0;

    // Request more message credits if needed
    if (messageCredits < this.thresholds.messageThreshold) {
      const needed = Math.max(
        10,
        this.thresholds.messageThreshold - messageCredits
      );
      await this.client.sendFlowUpdate(channel, needed);
    }

    // Request more byte credits if needed
    if (byteCredits < this.thresholds.byteThreshold) {
      const needed = Math.max(
        1024 * 1024,
        this.thresholds.byteThreshold - byteCredits
      );
      await this.client.sendFlowUpdate(channel, 0, needed);
    }
  }
}

// Usage
const flowControl = new AutomaticFlowControl(client);

Channel Control

Pausing and Resuming Channels

// Pause message flow on a channel
await client.pauseChannel("USER");

// Resume message flow on a channel
await client.resumeChannel("USER");

Channel State Management

class ChannelManager {
  private pausedChannels = new Set<string>();

  constructor(private client: any) {
    this.setupChannelHandling();
  }

  private setupChannelHandling() {
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "PAUSE_CHANNEL") {
        this.pausedChannels.add(message.payload.channel);
        console.log(`Channel ${message.payload.channel} paused`);
      } else if (message.type === "RESUME_CHANNEL") {
        this.pausedChannels.delete(message.payload.channel);
        console.log(`Channel ${message.payload.channel} resumed`);
      }
    });
  }

  isChannelPaused(channel: string): boolean {
    return this.pausedChannels.has(channel);
  }

  async sendWithChannelCheck(channel: string, messageFn: () => Promise<void>) {
    if (this.isChannelPaused(channel)) {
      console.log(`Channel ${channel} is paused, queuing message`);
      // Queue the message for later
      this.queueMessage(channel, messageFn);
      return;
    }

    await messageFn();
  }

  private queueMessage(channel: string, messageFn: () => Promise<void>) {
    // Implementation for message queuing
    // This would store the message and send it when the channel resumes
  }
}

// Usage
const channelManager = new ChannelManager(client);

// Send message with channel check
await channelManager.sendWithChannelCheck("USER", async () => {
  await client.sendTextMessage("USER", "Hello", "user", runId);
});

Message Queuing

Credit-Aware Message Queue

class CreditAwareQueue {
  private queues = new Map<string, Array<() => Promise<void>>>();
  private processing = new Map<string, boolean>();

  constructor(private client: any) {
    this.setupFlowControlHandling();
  }

  private setupFlowControlHandling() {
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "FLOW_UPDATE") {
        const { channel, add_messages, add_bytes } = message.payload;
        if (add_messages > 0 || add_bytes > 0) {
          this.processQueue(channel);
        }
      }
    });
  }

  async enqueue(channel: string, messageFn: () => Promise<void>) {
    if (!this.queues.has(channel)) {
      this.queues.set(channel, []);
    }

    this.queues.get(channel)!.push(messageFn);
    await this.processQueue(channel);
  }

  private async processQueue(channel: string) {
    if (this.processing.get(channel)) {
      return; // Already processing
    }

    this.processing.set(channel, true);

    try {
      const queue = this.queues.get(channel) || [];

      while (queue.length > 0) {
        const state = this.client.getConnectionState();
        const messageCredits = state.credits.get(channel) || 0;
        const byteCredits = state.byteCredits.get(channel) || 0;

        // Check if we have credits to send
        if (messageCredits <= 0 || byteCredits <= 0) {
          break; // Wait for more credits
        }

        const messageFn = queue.shift();
        if (messageFn) {
          try {
            await messageFn();
          } catch (error) {
            console.error(
              `Failed to send queued message on ${channel}:`,
              error
            );
          }
        }
      }
    } finally {
      this.processing.set(channel, false);
    }
  }

  getQueueLength(channel: string): number {
    return this.queues.get(channel)?.length || 0;
  }

  clearQueue(channel: string) {
    this.queues.set(channel, []);
  }
}

// Usage
const queue = new CreditAwareQueue(client);

// Queue messages
await queue.enqueue("USER", async () => {
  await client.sendTextMessage("USER", "Message 1", "user", runId);
});

await queue.enqueue("USER", async () => {
  await client.sendTextMessage("USER", "Message 2", "user", runId);
});

// Check queue status
console.log("User channel queue length:", queue.getQueueLength("USER"));

Flow Control Strategies

Conservative Strategy

class ConservativeFlowControl {
  constructor(private client: any) {
    this.setupConservativeControl();
  }

  private setupConservativeControl() {
    // Request credits early
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "FLOW_UPDATE") {
        const { channel, add_messages, add_bytes } = message.payload;

        // If we received credits, request more when we're at 50%
        if (add_messages > 0) {
          setTimeout(async () => {
            const state = this.client.getConnectionState();
            const currentCredits = state.credits.get(channel) || 0;

            if (currentCredits <= add_messages / 2) {
              await this.client.sendFlowUpdate(channel, add_messages);
            }
          }, 1000);
        }
      }
    });
  }
}

Aggressive Strategy

class AggressiveFlowControl {
  constructor(private client: any) {
    this.setupAggressiveControl();
  }

  private setupAggressiveControl() {
    // Request maximum credits immediately
    this.client.on("connect", async () => {
      const channels = ["USER", "AGENT", "SYSTEM"];

      for (const channel of channels) {
        await this.client.sendFlowUpdate(channel, 100, 10 * 1024 * 1024);
      }
    });
  }
}

Adaptive Strategy

class AdaptiveFlowControl {
  private messageRates = new Map<string, number>();
  private lastRequestTime = new Map<string, number>();

  constructor(private client: any) {
    this.setupAdaptiveControl();
  }

  private setupAdaptiveControl() {
    // Track message rates
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "MESSAGE_START") {
        const channel = message.payload.channel;
        const now = Date.now();
        const rate = this.messageRates.get(channel) || 0;

        // Calculate new rate (exponential moving average)
        this.messageRates.set(channel, rate * 0.9 + 0.1);
      }
    });

    // Adaptive credit requests
    setInterval(() => {
      this.requestAdaptiveCredits();
    }, 5000);
  }

  private async requestAdaptiveCredits() {
    const channels = ["USER", "AGENT", "SYSTEM"];

    for (const channel of channels) {
      const rate = this.messageRates.get(channel) || 0;
      const lastRequest = this.lastRequestTime.get(channel) || 0;
      const now = Date.now();

      // Request more credits if rate is high and we haven't requested recently
      if (rate > 0.5 && now - lastRequest > 10000) {
        const credits = Math.min(50, Math.floor(rate * 100));
        await this.client.sendFlowUpdate(channel, credits);
        this.lastRequestTime.set(channel, now);
      }
    }
  }
}

Flow Control Testing

Mock Flow Control Testing

class MockFlowControl {
  private credits = new Map<string, number>();
  private byteCredits = new Map<string, number>();

  constructor() {
    // Initialize with default credits
    ["USER", "AGENT", "SYSTEM"].forEach((channel) => {
      this.credits.set(channel, 10);
      this.byteCredits.set(channel, 1024 * 1024);
    });
  }

  consumeCredits(
    channel: string,
    messageCredits: number = 1,
    byteCredits: number = 0
  ): boolean {
    const currentCredits = this.credits.get(channel) || 0;
    const currentByteCredits = this.byteCredits.get(channel) || 0;

    if (currentCredits >= messageCredits && currentByteCredits >= byteCredits) {
      this.credits.set(channel, currentCredits - messageCredits);
      this.byteCredits.set(channel, currentByteCredits - byteCredits);
      return true;
    }

    return false;
  }

  addCredits(
    channel: string,
    messageCredits: number = 0,
    byteCredits: number = 0
  ) {
    const currentCredits = this.credits.get(channel) || 0;
    const currentByteCredits = this.byteCredits.get(channel) || 0;

    this.credits.set(channel, currentCredits + messageCredits);
    this.byteCredits.set(channel, currentByteCredits + byteCredits);
  }

  getCredits(channel: string): { messages: number; bytes: number } {
    return {
      messages: this.credits.get(channel) || 0,
      bytes: this.byteCredits.get(channel) || 0,
    };
  }
}

// Usage in tests
const mockFlowControl = new MockFlowControl();

// Test credit consumption
const canSend = mockFlowControl.consumeCredits("USER", 1, 100);
expect(canSend).toBe(true);

// Test credit exhaustion
for (let i = 0; i < 10; i++) {
  mockFlowControl.consumeCredits("USER", 1);
}

const canSendMore = mockFlowControl.consumeCredits("USER", 1);
expect(canSendMore).toBe(false);

Flow Control Integration Testing

describe("Flow Control", () => {
  test("should respect credit limits", async () => {
    const client = createHAIPClient({
      url: "ws://localhost:8080",
      token: "test-token",
      transport: "websocket",
      flowControl: {
        initialCredits: 2,
        initialCreditBytes: 1024,
      },
    });

    // Mock flow control state
    const mockState = {
      credits: new Map([["USER", 2]]),
      byteCredits: new Map([["USER", 1024]]),
    };

    jest.spyOn(client, "getConnectionState").mockReturnValue(mockState);

    // Should be able to send 2 messages
    await client.sendTextMessage("USER", "Message 1", "user", "run-1");
    await client.sendTextMessage("USER", "Message 2", "user", "run-1");

    // Third message should fail or be queued
    await expect(
      client.sendTextMessage("USER", "Message 3", "user", "run-1")
    ).rejects.toThrow("FLOW_CONTROL");
  });
});

Performance Monitoring

Flow Control Metrics

class FlowControlMetrics {
  private metrics = {
    creditRequests: 0,
    creditReceived: 0,
    messagesBlocked: 0,
    averageWaitTime: 0,
    totalWaitTime: 0,
  };

  recordCreditRequest() {
    this.metrics.creditRequests++;
  }

  recordCreditReceived(amount: number) {
    this.metrics.creditReceived += amount;
  }

  recordMessageBlocked(waitTime: number) {
    this.metrics.messagesBlocked++;
    this.metrics.totalWaitTime += waitTime;
    this.metrics.averageWaitTime =
      this.metrics.totalWaitTime / this.metrics.messagesBlocked;
  }

  getMetrics() {
    return {
      ...this.metrics,
      creditUtilization:
        this.metrics.creditReceived > 0
          ? (this.metrics.creditRequests / this.metrics.creditReceived) * 100
          : 0,
    };
  }

  reset() {
    this.metrics = {
      creditRequests: 0,
      creditReceived: 0,
      messagesBlocked: 0,
      averageWaitTime: 0,
      totalWaitTime: 0,
    };
  }
}

// Usage
const flowMetrics = new FlowControlMetrics();

// Monitor flow control performance
setInterval(() => {
  const metrics = flowMetrics.getMetrics();
  console.log("Flow control metrics:", metrics);
}, 10000);

Next Steps