New protocol version released: This page may contain outdated information.
Flow Control Overview
What is Flow Control?
Flow control is a mechanism that regulates the rate of message transmission between client and server to:- Prevent message loss due to buffer overflow
 - Ensure fair resource allocation
 - Maintain system stability under load
 - Provide back-pressure feedback
 
Credit-Based System
HAIP uses a credit-based flow control system where:- Message Credits: Control how many messages can be sent
 - Byte Credits: Control how much data can be sent
 - Channel-Specific: Each channel has independent credit pools
 - Dynamic Updates: Credits are replenished based on server capacity
 
Flow Control Configuration
Basic Configuration
Copy
import { createHAIPClient } from "haip-sdk";
const client = createHAIPClient({
  url: "ws://localhost:8080",
  token: "your-jwt-token",
  transport: "websocket",
  flowControl: {
    initialCredits: 10, // Initial message credits per channel
    initialCreditBytes: 1024 * 1024, // Initial byte credits (1MB)
    maxCredits: 100, // Maximum credits per channel
    maxCreditBytes: 10 * 1024 * 1024, // Maximum byte credits (10MB)
  },
});
Advanced Configuration
Copy
const client = createHAIPClient({
  url: "ws://localhost:8080",
  token: "your-jwt-token",
  transport: "websocket",
  flowControl: {
    initialCredits: {
      USER: 20,
      AGENT: 10,
      SYSTEM: 5,
    },
    initialCreditBytes: {
      USER: 2 * 1024 * 1024, // 2MB for user messages
      AGENT: 5 * 1024 * 1024, // 5MB for agent responses
      SYSTEM: 1024 * 1024, // 1MB for system messages
    },
    maxCredits: {
      USER: 50,
      AGENT: 25,
      SYSTEM: 10,
    },
    maxCreditBytes: {
      USER: 10 * 1024 * 1024,
      AGENT: 20 * 1024 * 1024,
      SYSTEM: 5 * 1024 * 1024,
    },
  },
});
Flow Control State
Checking Current State
Copy
// Get current flow control state
const state = client.getConnectionState();
console.log("Flow control state:", {
  userCredits: state.credits.get("USER"),
  agentCredits: state.credits.get("AGENT"),
  systemCredits: state.credits.get("SYSTEM"),
  userBytes: state.byteCredits.get("USER"),
  agentBytes: state.byteCredits.get("AGENT"),
  systemBytes: state.byteCredits.get("SYSTEM"),
});
Monitoring Flow Control
Copy
class FlowControlMonitor {
  constructor(private client: any) {
    this.setupMonitoring();
  }
  private setupMonitoring() {
    // Monitor flow control updates
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "FLOW_UPDATE") {
        this.handleFlowUpdate(message);
      }
    });
    // Periodic state check
    setInterval(() => {
      this.logFlowState();
    }, 5000);
  }
  private handleFlowUpdate(message: HAIPMessage) {
    const { channel, add_messages, add_bytes } = message.payload;
    console.log(`Flow update for ${channel}:`, {
      messageCredits: add_messages,
      byteCredits: add_bytes,
    });
  }
  private logFlowState() {
    const state = this.client.getConnectionState();
    console.log("Current flow state:", {
      user: {
        messages: state.credits.get("USER"),
        bytes: state.byteCredits.get("USER"),
      },
      agent: {
        messages: state.credits.get("AGENT"),
        bytes: state.byteCredits.get("AGENT"),
      },
      system: {
        messages: state.credits.get("SYSTEM"),
        bytes: state.byteCredits.get("SYSTEM"),
      },
    });
  }
}
// Usage
const monitor = new FlowControlMonitor(client);
Sending Flow Updates
Requesting More Credits
Copy
// Request more message credits
await client.sendFlowUpdate("USER", 10); // Request 10 more message credits
// Request more byte credits
await client.sendFlowUpdate("USER", 0, 1024 * 1024); // Request 1MB more byte credits
// Request both
await client.sendFlowUpdate("USER", 10, 1024 * 1024); // Request both message and byte credits
Automatic Credit Management
Copy
class AutomaticFlowControl {
  private thresholds = {
    messageThreshold: 5, // Request more when below 5 message credits
    byteThreshold: 512 * 1024, // Request more when below 512KB byte credits
  };
  constructor(private client: any) {
    this.setupAutomaticControl();
  }
  private setupAutomaticControl() {
    // Check flow control before sending messages
    const originalSendTextMessage = this.client.sendTextMessage.bind(
      this.client
    );
    this.client.sendTextMessage = async (
      channel: string,
      text: string,
      author?: string,
      runId?: string
    ) => {
      await this.ensureCredits(channel);
      return originalSendTextMessage(channel, text, author, runId);
    };
  }
  private async ensureCredits(channel: string) {
    const state = this.client.getConnectionState();
    const messageCredits = state.credits.get(channel) || 0;
    const byteCredits = state.byteCredits.get(channel) || 0;
    // Request more message credits if needed
    if (messageCredits < this.thresholds.messageThreshold) {
      const needed = Math.max(
        10,
        this.thresholds.messageThreshold - messageCredits
      );
      await this.client.sendFlowUpdate(channel, needed);
    }
    // Request more byte credits if needed
    if (byteCredits < this.thresholds.byteThreshold) {
      const needed = Math.max(
        1024 * 1024,
        this.thresholds.byteThreshold - byteCredits
      );
      await this.client.sendFlowUpdate(channel, 0, needed);
    }
  }
}
// Usage
const flowControl = new AutomaticFlowControl(client);
Channel Control
Pausing and Resuming Channels
Copy
// Pause message flow on a channel
await client.pauseChannel("USER");
// Resume message flow on a channel
await client.resumeChannel("USER");
Channel State Management
Copy
class ChannelManager {
  private pausedChannels = new Set<string>();
  constructor(private client: any) {
    this.setupChannelHandling();
  }
  private setupChannelHandling() {
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "PAUSE_CHANNEL") {
        this.pausedChannels.add(message.payload.channel);
        console.log(`Channel ${message.payload.channel} paused`);
      } else if (message.type === "RESUME_CHANNEL") {
        this.pausedChannels.delete(message.payload.channel);
        console.log(`Channel ${message.payload.channel} resumed`);
      }
    });
  }
  isChannelPaused(channel: string): boolean {
    return this.pausedChannels.has(channel);
  }
  async sendWithChannelCheck(channel: string, messageFn: () => Promise<void>) {
    if (this.isChannelPaused(channel)) {
      console.log(`Channel ${channel} is paused, queuing message`);
      // Queue the message for later
      this.queueMessage(channel, messageFn);
      return;
    }
    await messageFn();
  }
  private queueMessage(channel: string, messageFn: () => Promise<void>) {
    // Implementation for message queuing
    // This would store the message and send it when the channel resumes
  }
}
// Usage
const channelManager = new ChannelManager(client);
// Send message with channel check
await channelManager.sendWithChannelCheck("USER", async () => {
  await client.sendTextMessage("USER", "Hello", "user", runId);
});
Message Queuing
Credit-Aware Message Queue
Copy
class CreditAwareQueue {
  private queues = new Map<string, Array<() => Promise<void>>>();
  private processing = new Map<string, boolean>();
  constructor(private client: any) {
    this.setupFlowControlHandling();
  }
  private setupFlowControlHandling() {
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "FLOW_UPDATE") {
        const { channel, add_messages, add_bytes } = message.payload;
        if (add_messages > 0 || add_bytes > 0) {
          this.processQueue(channel);
        }
      }
    });
  }
  async enqueue(channel: string, messageFn: () => Promise<void>) {
    if (!this.queues.has(channel)) {
      this.queues.set(channel, []);
    }
    this.queues.get(channel)!.push(messageFn);
    await this.processQueue(channel);
  }
  private async processQueue(channel: string) {
    if (this.processing.get(channel)) {
      return; // Already processing
    }
    this.processing.set(channel, true);
    try {
      const queue = this.queues.get(channel) || [];
      while (queue.length > 0) {
        const state = this.client.getConnectionState();
        const messageCredits = state.credits.get(channel) || 0;
        const byteCredits = state.byteCredits.get(channel) || 0;
        // Check if we have credits to send
        if (messageCredits <= 0 || byteCredits <= 0) {
          break; // Wait for more credits
        }
        const messageFn = queue.shift();
        if (messageFn) {
          try {
            await messageFn();
          } catch (error) {
            console.error(
              `Failed to send queued message on ${channel}:`,
              error
            );
          }
        }
      }
    } finally {
      this.processing.set(channel, false);
    }
  }
  getQueueLength(channel: string): number {
    return this.queues.get(channel)?.length || 0;
  }
  clearQueue(channel: string) {
    this.queues.set(channel, []);
  }
}
// Usage
const queue = new CreditAwareQueue(client);
// Queue messages
await queue.enqueue("USER", async () => {
  await client.sendTextMessage("USER", "Message 1", "user", runId);
});
await queue.enqueue("USER", async () => {
  await client.sendTextMessage("USER", "Message 2", "user", runId);
});
// Check queue status
console.log("User channel queue length:", queue.getQueueLength("USER"));
Flow Control Strategies
Conservative Strategy
Copy
class ConservativeFlowControl {
  constructor(private client: any) {
    this.setupConservativeControl();
  }
  private setupConservativeControl() {
    // Request credits early
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "FLOW_UPDATE") {
        const { channel, add_messages, add_bytes } = message.payload;
        // If we received credits, request more when we're at 50%
        if (add_messages > 0) {
          setTimeout(async () => {
            const state = this.client.getConnectionState();
            const currentCredits = state.credits.get(channel) || 0;
            if (currentCredits <= add_messages / 2) {
              await this.client.sendFlowUpdate(channel, add_messages);
            }
          }, 1000);
        }
      }
    });
  }
}
Aggressive Strategy
Copy
class AggressiveFlowControl {
  constructor(private client: any) {
    this.setupAggressiveControl();
  }
  private setupAggressiveControl() {
    // Request maximum credits immediately
    this.client.on("connect", async () => {
      const channels = ["USER", "AGENT", "SYSTEM"];
      for (const channel of channels) {
        await this.client.sendFlowUpdate(channel, 100, 10 * 1024 * 1024);
      }
    });
  }
}
Adaptive Strategy
Copy
class AdaptiveFlowControl {
  private messageRates = new Map<string, number>();
  private lastRequestTime = new Map<string, number>();
  constructor(private client: any) {
    this.setupAdaptiveControl();
  }
  private setupAdaptiveControl() {
    // Track message rates
    this.client.on("message", (message: HAIPMessage) => {
      if (message.type === "MESSAGE_START") {
        const channel = message.payload.channel;
        const now = Date.now();
        const rate = this.messageRates.get(channel) || 0;
        // Calculate new rate (exponential moving average)
        this.messageRates.set(channel, rate * 0.9 + 0.1);
      }
    });
    // Adaptive credit requests
    setInterval(() => {
      this.requestAdaptiveCredits();
    }, 5000);
  }
  private async requestAdaptiveCredits() {
    const channels = ["USER", "AGENT", "SYSTEM"];
    for (const channel of channels) {
      const rate = this.messageRates.get(channel) || 0;
      const lastRequest = this.lastRequestTime.get(channel) || 0;
      const now = Date.now();
      // Request more credits if rate is high and we haven't requested recently
      if (rate > 0.5 && now - lastRequest > 10000) {
        const credits = Math.min(50, Math.floor(rate * 100));
        await this.client.sendFlowUpdate(channel, credits);
        this.lastRequestTime.set(channel, now);
      }
    }
  }
}
Flow Control Testing
Mock Flow Control Testing
Copy
class MockFlowControl {
  private credits = new Map<string, number>();
  private byteCredits = new Map<string, number>();
  constructor() {
    // Initialize with default credits
    ["USER", "AGENT", "SYSTEM"].forEach((channel) => {
      this.credits.set(channel, 10);
      this.byteCredits.set(channel, 1024 * 1024);
    });
  }
  consumeCredits(
    channel: string,
    messageCredits: number = 1,
    byteCredits: number = 0
  ): boolean {
    const currentCredits = this.credits.get(channel) || 0;
    const currentByteCredits = this.byteCredits.get(channel) || 0;
    if (currentCredits >= messageCredits && currentByteCredits >= byteCredits) {
      this.credits.set(channel, currentCredits - messageCredits);
      this.byteCredits.set(channel, currentByteCredits - byteCredits);
      return true;
    }
    return false;
  }
  addCredits(
    channel: string,
    messageCredits: number = 0,
    byteCredits: number = 0
  ) {
    const currentCredits = this.credits.get(channel) || 0;
    const currentByteCredits = this.byteCredits.get(channel) || 0;
    this.credits.set(channel, currentCredits + messageCredits);
    this.byteCredits.set(channel, currentByteCredits + byteCredits);
  }
  getCredits(channel: string): { messages: number; bytes: number } {
    return {
      messages: this.credits.get(channel) || 0,
      bytes: this.byteCredits.get(channel) || 0,
    };
  }
}
// Usage in tests
const mockFlowControl = new MockFlowControl();
// Test credit consumption
const canSend = mockFlowControl.consumeCredits("USER", 1, 100);
expect(canSend).toBe(true);
// Test credit exhaustion
for (let i = 0; i < 10; i++) {
  mockFlowControl.consumeCredits("USER", 1);
}
const canSendMore = mockFlowControl.consumeCredits("USER", 1);
expect(canSendMore).toBe(false);
Flow Control Integration Testing
Copy
describe("Flow Control", () => {
  test("should respect credit limits", async () => {
    const client = createHAIPClient({
      url: "ws://localhost:8080",
      token: "test-token",
      transport: "websocket",
      flowControl: {
        initialCredits: 2,
        initialCreditBytes: 1024,
      },
    });
    // Mock flow control state
    const mockState = {
      credits: new Map([["USER", 2]]),
      byteCredits: new Map([["USER", 1024]]),
    };
    jest.spyOn(client, "getConnectionState").mockReturnValue(mockState);
    // Should be able to send 2 messages
    await client.sendTextMessage("USER", "Message 1", "user", "run-1");
    await client.sendTextMessage("USER", "Message 2", "user", "run-1");
    // Third message should fail or be queued
    await expect(
      client.sendTextMessage("USER", "Message 3", "user", "run-1")
    ).rejects.toThrow("FLOW_CONTROL");
  });
});
Performance Monitoring
Flow Control Metrics
Copy
class FlowControlMetrics {
  private metrics = {
    creditRequests: 0,
    creditReceived: 0,
    messagesBlocked: 0,
    averageWaitTime: 0,
    totalWaitTime: 0,
  };
  recordCreditRequest() {
    this.metrics.creditRequests++;
  }
  recordCreditReceived(amount: number) {
    this.metrics.creditReceived += amount;
  }
  recordMessageBlocked(waitTime: number) {
    this.metrics.messagesBlocked++;
    this.metrics.totalWaitTime += waitTime;
    this.metrics.averageWaitTime =
      this.metrics.totalWaitTime / this.metrics.messagesBlocked;
  }
  getMetrics() {
    return {
      ...this.metrics,
      creditUtilization:
        this.metrics.creditReceived > 0
          ? (this.metrics.creditRequests / this.metrics.creditReceived) * 100
          : 0,
    };
  }
  reset() {
    this.metrics = {
      creditRequests: 0,
      creditReceived: 0,
      messagesBlocked: 0,
      averageWaitTime: 0,
      totalWaitTime: 0,
    };
  }
}
// Usage
const flowMetrics = new FlowControlMetrics();
// Monitor flow control performance
setInterval(() => {
  const metrics = flowMetrics.getMetrics();
  console.log("Flow control metrics:", metrics);
}, 10000);