New protocol version released: This page may contain outdated information.

Overview

This example demonstrates HAI Protocol’s credit-based flow control system for managing back-pressure and ensuring optimal data transmission rates.

Flow Control Manager Implementation

class HAIPFlowControl {
  private ws: WebSocket | null = null;
  private sessionId: string;
  private seqCounter = 1;
  private credits = new Map<string, number>();
  private pendingMessages = new Map<string, PendingMessage[]>();
  private flowControlConfig: FlowControlConfig;
  private isPaused = new Map<string, boolean>();

  constructor(
    private url: string,
    private token: string,
    config: Partial<FlowControlConfig> = {}
  ) {
    this.sessionId = this.generateUUID();
    this.flowControlConfig = {
      initialCredits: 1000,
      minCredits: 100,
      maxCredits: 10000,
      creditThreshold: 0.2,
      backPressureThreshold: 0.1,
      adaptiveAdjustment: true,
      ...config
    };

    this.initializeCredits();
  }

  private initializeCredits() {
    // Initialize credits for each channel
    this.credits.set("USER", this.flowControlConfig.initialCredits);
    this.credits.set("AGENT", this.flowControlConfig.initialCredits);
    this.credits.set("SYSTEM", this.flowControlConfig.initialCredits);
    this.credits.set("AUDIO_IN", this.flowControlConfig.initialCredits);
    this.credits.set("AUDIO_OUT", this.flowControlConfig.initialCredits);

    // Initialize pause states
    this.isPaused.set("USER", false);
    this.isPaused.set("AGENT", false);
    this.isPaused.set("SYSTEM", false);
    this.isPaused.set("AUDIO_IN", false);
    this.isPaused.set("AUDIO_OUT", false);
  }

  async connect(): Promise<void> {
    const wsUrl = `${this.url}?token=${encodeURIComponent(this.token)}`;
    this.ws = new WebSocket(wsUrl);

    this.ws.onopen = () => {
      console.log('Connected to HAIP flow control');
      this.sendHandshake();
    };

    this.ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      this.handleMessage(message);
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };

    this.ws.onclose = () => {
      console.log('Disconnected from HAIP flow control');
    };
  }

  private sendHandshake() {
    const handshake = {
      id: this.generateUUID(),
      session: this.sessionId,
      seq: this.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: "SYSTEM",
      type: "HAI",
      payload: {
        haip_version: "1.1.2",
        accept_major: [1],
        accept_events: [
          "HAI", "FLOW_UPDATE", "PAUSE_CHANNEL", "RESUME_CHANNEL",
          "MESSAGE_START", "MESSAGE_PART", "MESSAGE_END"
        ],
        flow_control: {
          enabled: true,
          initial_credits: this.flowControlConfig.initialCredits,
          adaptive: this.flowControlConfig.adaptiveAdjustment
        }
      }
    };

    this.ws!.send(JSON.stringify(handshake));
  }

  sendMessage(channel: string, messageType: string, payload: any): boolean {
    if (this.isPaused.get(channel)) {
      console.warn(`Channel ${channel} is paused, queuing message`);
      this.queueMessage(channel, messageType, payload);
      return false;
    }

    const currentCredits = this.credits.get(channel) || 0;
    const messageSize = this.calculateMessageSize(payload);

    if (currentCredits < messageSize) {
      console.warn(`Insufficient credits for channel ${channel}: ${currentCredits} < ${messageSize}`);
      this.queueMessage(channel, messageType, payload);
      this.requestMoreCredits(channel);
      return false;
    }

    // Deduct credits
    this.credits.set(channel, currentCredits - messageSize);

    // Send message
    const message = {
      id: this.generateUUID(),
      session: this.sessionId,
      seq: this.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: channel,
      type: messageType,
      payload: payload
    };

    this.ws!.send(JSON.stringify(message));

    // Check if we need to request more credits
    this.checkCreditThreshold(channel);

    return true;
  }

  private calculateMessageSize(payload: any): number {
    // Simple size calculation based on JSON string length
    const payloadStr = JSON.stringify(payload);
    return payloadStr.length;
  }

  private queueMessage(channel: string, messageType: string, payload: any) {
    if (!this.pendingMessages.has(channel)) {
      this.pendingMessages.set(channel, []);
    }

    const pending = this.pendingMessages.get(channel)!;
    pending.push({
      messageType,
      payload,
      timestamp: Date.now()
    });

    console.log(`Queued message for channel ${channel}, queue size: ${pending.length}`);
  }

  private requestMoreCredits(channel: string) {
    const currentCredits = this.credits.get(channel) || 0;
    const requestedCredits = Math.min(
      this.flowControlConfig.maxCredits - currentCredits,
      this.flowControlConfig.initialCredits
    );

    if (requestedCredits > 0) {
      const flowUpdate = {
        id: this.generateUUID(),
        session: this.sessionId,
        seq: this.seqCounter++.toString(),
        ts: Date.now().toString(),
        channel: "SYSTEM",
        type: "FLOW_UPDATE",
        payload: {
          channel: channel,
          action: "request_credits",
          credits: requestedCredits,
          current_credits: currentCredits
        }
      };

      this.ws!.send(JSON.stringify(flowUpdate));
    }
  }

  private checkCreditThreshold(channel: string) {
    const currentCredits = this.credits.get(channel) || 0;
    const threshold = this.flowControlConfig.initialCredits * this.flowControlConfig.creditThreshold;

    if (currentCredits < threshold) {
      this.requestMoreCredits(channel);
    }
  }

  private processPendingMessages(channel: string) {
    const pending = this.pendingMessages.get(channel);
    if (!pending || pending.length === 0) return;

    const currentCredits = this.credits.get(channel) || 0;
    const processed: number[] = [];

    for (let i = 0; i < pending.length; i++) {
      const message = pending[i];
      const messageSize = this.calculateMessageSize(message.payload);

      if (currentCredits >= messageSize) {
        // Send the message
        const haipMessage = {
          id: this.generateUUID(),
          session: this.sessionId,
          seq: this.seqCounter++.toString(),
          ts: Date.now().toString(),
          channel: channel,
          type: message.messageType,
          payload: message.payload
        };

        this.ws!.send(JSON.stringify(haipMessage));

        // Update credits
        this.credits.set(channel, currentCredits - messageSize);
        processed.push(i);

        console.log(`Processed queued message for channel ${channel}`);
      } else {
        break;
      }
    }

    // Remove processed messages
    for (let i = processed.length - 1; i >= 0; i--) {
      pending.splice(processed[i], 1);
    }
  }

  pauseChannel(channel: string) {
    this.isPaused.set(channel, true);

    const pauseMessage = {
      id: this.generateUUID(),
      session: this.sessionId,
      seq: this.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: "SYSTEM",
      type: "PAUSE_CHANNEL",
      payload: {
        channel: channel,
        reason: "flow_control"
      }
    };

    this.ws!.send(JSON.stringify(pauseMessage));
    console.log(`Paused channel: ${channel}`);
  }

  resumeChannel(channel: string) {
    this.isPaused.set(channel, false);

    const resumeMessage = {
      id: this.generateUUID(),
      session: this.sessionId,
      seq: this.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: "SYSTEM",
      type: "RESUME_CHANNEL",
      payload: {
        channel: channel
      }
    };

    this.ws!.send(JSON.stringify(resumeMessage));
    console.log(`Resumed channel: ${channel}`);

    // Process any pending messages
    this.processPendingMessages(channel);
  }

  private handleMessage(message: any) {
    switch (message.type) {
      case 'HAI':
        console.log('Handshake completed');
        break;
      case 'FLOW_UPDATE':
        this.handleFlowUpdate(message.payload);
        break;
      case 'PAUSE_CHANNEL':
        this.handlePauseChannel(message.payload);
        break;
      case 'RESUME_CHANNEL':
        this.handleResumeChannel(message.payload);
        break;
    }
  }

  private handleFlowUpdate(payload: any) {
    const { channel, action, credits } = payload;

    switch (action) {
      case 'grant_credits':
        const currentCredits = this.credits.get(channel) || 0;
        const newCredits = Math.min(
          currentCredits + credits,
          this.flowControlConfig.maxCredits
        );
        this.credits.set(channel, newCredits);

        console.log(`Granted ${credits} credits to channel ${channel}, total: ${newCredits}`);

        // Process pending messages
        this.processPendingMessages(channel);
        break;

      case 'reduce_credits':
        const current = this.credits.get(channel) || 0;
        const reduced = Math.max(current - credits, this.flowControlConfig.minCredits);
        this.credits.set(channel, reduced);

        console.log(`Reduced credits for channel ${channel} by ${credits}, total: ${reduced}`);
        break;

      case 'back_pressure':
        this.handleBackPressure(channel, payload);
        break;
    }
  }

  private handleBackPressure(channel: string, payload: any) {
    const { pressure_level, queue_size } = payload;

    console.log(`Back pressure detected on channel ${channel}: level ${pressure_level}, queue ${queue_size}`);

    if (pressure_level > this.flowControlConfig.backPressureThreshold) {
      this.pauseChannel(channel);
    }
  }

  private handlePauseChannel(payload: any) {
    const { channel } = payload;
    this.isPaused.set(channel, true);
    console.log(`Channel ${channel} paused by server`);
  }

  private handleResumeChannel(payload: any) {
    const { channel } = payload;
    this.isPaused.set(channel, false);
    console.log(`Channel ${channel} resumed by server`);
    this.processPendingMessages(channel);
  }

  getCredits(channel: string): number {
    return this.credits.get(channel) || 0;
  }

  isChannelPaused(channel: string): boolean {
    return this.isPaused.get(channel) || false;
  }

  getPendingCount(channel: string): number {
    const pending = this.pendingMessages.get(channel);
    return pending ? pending.length : 0;
  }

  disconnect() {
    if (this.ws) {
      this.ws.close();
      this.ws = null;
    }
  }

  private generateUUID(): string {
    return crypto.randomUUID();
  }
}

interface FlowControlConfig {
  initialCredits: number;
  minCredits: number;
  maxCredits: number;
  creditThreshold: number;
  backPressureThreshold: number;
  adaptiveAdjustment: boolean;
}

interface PendingMessage {
  messageType: string;
  payload: any;
  timestamp: number;
}

Adaptive Flow Control Implementation

class AdaptiveFlowControl extends HAIPFlowControl {
  private performanceMetrics = new Map<string, PerformanceMetrics>();
  private adjustmentInterval: NodeJS.Timeout | null = null;

  constructor(url: string, token: string, config: Partial<FlowControlConfig> = {}) {
    super(url, token, { ...config, adaptiveAdjustment: true });
    this.startAdaptiveAdjustment();
  }

  private startAdaptiveAdjustment() {
    this.adjustmentInterval = setInterval(() => {
      this.adjustFlowControl();
    }, 5000); // Adjust every 5 seconds
  }

  private adjustFlowControl() {
    for (const [channel, metrics] of this.performanceMetrics.entries()) {
      const currentCredits = this.getCredits(channel);
      const pendingCount = this.getPendingCount(channel);

      // Calculate performance score
      const performanceScore = this.calculatePerformanceScore(metrics);

      // Adjust credits based on performance
      if (performanceScore > 0.8 && pendingCount === 0) {
        // High performance, can increase credits
        const increase = Math.floor(currentCredits * 0.1);
        this.requestCreditAdjustment(channel, increase);
      } else if (performanceScore < 0.3 || pendingCount > 10) {
        // Low performance or high queue, reduce credits
        const decrease = Math.floor(currentCredits * 0.2);
        this.requestCreditAdjustment(channel, -decrease);
      }

      // Reset metrics for next interval
      this.performanceMetrics.set(channel, {
        messagesSent: 0,
        messagesQueued: 0,
        averageLatency: 0,
        backPressureEvents: 0
      });
    }
  }

  private calculatePerformanceScore(metrics: PerformanceMetrics): number {
    const { messagesSent, messagesQueued, averageLatency, backPressureEvents } = metrics;

    if (messagesSent === 0) return 0;

    const queueRatio = messagesQueued / (messagesSent + messagesQueued);
    const latencyScore = Math.max(0, 1 - (averageLatency / 1000)); // Normalize to 1 second
    const backPressureScore = Math.max(0, 1 - (backPressureEvents / 10)); // Normalize to 10 events

    return (1 - queueRatio) * 0.4 + latencyScore * 0.4 + backPressureScore * 0.2;
  }

  private requestCreditAdjustment(channel: string, adjustment: number) {
    const flowUpdate = {
      id: this.generateUUID(),
      session: this.sessionId,
      seq: this.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: "SYSTEM",
      type: "FLOW_UPDATE",
      payload: {
        channel: channel,
        action: adjustment > 0 ? "request_credits" : "reduce_credits",
        credits: Math.abs(adjustment),
        reason: "adaptive_adjustment"
      }
    };

    this.ws!.send(JSON.stringify(flowUpdate));
  }

  override sendMessage(channel: string, messageType: string, payload: any): boolean {
    const startTime = Date.now();
    const success = super.sendMessage(channel, messageType, payload);

    // Update performance metrics
    this.updateMetrics(channel, success, Date.now() - startTime);

    return success;
  }

  private updateMetrics(channel: string, success: boolean, latency: number) {
    if (!this.performanceMetrics.has(channel)) {
      this.performanceMetrics.set(channel, {
        messagesSent: 0,
        messagesQueued: 0,
        averageLatency: 0,
        backPressureEvents: 0
      });
    }

    const metrics = this.performanceMetrics.get(channel)!;

    if (success) {
      metrics.messagesSent++;
      metrics.averageLatency = (metrics.averageLatency + latency) / 2;
    } else {
      metrics.messagesQueued++;
    }
  }

  recordBackPressure(channel: string) {
    const metrics = this.performanceMetrics.get(channel);
    if (metrics) {
      metrics.backPressureEvents++;
    }
  }

  getPerformanceMetrics(channel: string): PerformanceMetrics | null {
    return this.performanceMetrics.get(channel) || null;
  }

  disconnect() {
    if (this.adjustmentInterval) {
      clearInterval(this.adjustmentInterval);
      this.adjustmentInterval = null;
    }
    super.disconnect();
  }
}

interface PerformanceMetrics {
  messagesSent: number;
  messagesQueued: number;
  averageLatency: number;
  backPressureEvents: number;
}

Usage Example

// Create flow control manager
const flowControl = new AdaptiveFlowControl(
  "wss://api.haiprotocol.com/haip/websocket",
  "your-jwt-token",
  {
    initialCredits: 2000,
    minCredits: 200,
    maxCredits: 20000,
    creditThreshold: 0.3,
    backPressureThreshold: 0.15,
  }
);

// Set up event handlers
flowControl.onFlowUpdate = (payload) => {
  console.log("Flow update:", payload);
};

flowControl.onBackPressure = (payload) => {
  console.log("Back pressure detected:", payload);
  flowControl.recordBackPressure(payload.channel);
};

// Connect and use flow control
await flowControl.connect();

// Send messages with flow control
const sendTextMessage = (text: string) => {
  const success = flowControl.sendMessage("USER", "MESSAGE_START", {
    message_id: crypto.randomUUID(),
    author: "USER",
    text: text,
  });

  if (!success) {
    console.log("Message queued due to insufficient credits");
  }
};

// Send multiple messages to test flow control
for (let i = 0; i < 100; i++) {
  sendTextMessage(`Message ${i + 1}`);

  // Check flow control status
  const credits = flowControl.getCredits("USER");
  const pending = flowControl.getPendingCount("USER");
  const paused = flowControl.isChannelPaused("USER");

  console.log(`Credits: ${credits}, Pending: ${pending}, Paused: ${paused}`);

  // Add delay to simulate real usage
  await new Promise((resolve) => setTimeout(resolve, 100));
}

// Monitor performance
setInterval(() => {
  const metrics = flowControl.getPerformanceMetrics("USER");
  if (metrics) {
    console.log("Performance metrics:", metrics);
  }
}, 10000);

Server Implementation

import WebSocket from 'ws';
import { v4 as uuidv4 } from 'uuid';

class HAIPFlowControlServer {
  private wss: WebSocket.Server;
  private sessions = new Map<string, any>();
  private flowControl = new Map<string, ServerFlowControl>();

  constructor(port: number) {
    this.wss = new WebSocket.Server({ port });
    this.setupServer();
  }

  private setupServer() {
    this.wss.on('connection', (ws, req) => {
      const sessionId = uuidv4();
      this.sessions.set(sessionId, { ws, seqCounter: 1 });
      this.flowControl.set(sessionId, new ServerFlowControl());

      ws.on('message', (data) => {
        const message = JSON.parse(data.toString());
        this.handleMessage(sessionId, message);
      });

      ws.on('close', () => {
        this.sessions.delete(sessionId);
        this.flowControl.delete(sessionId);
      });
    });
  }

  private handleMessage(sessionId: string, message: any) {
    switch (message.type) {
      case 'HAI':
        this.handleHandshake(sessionId, message);
        break;
      case 'FLOW_UPDATE':
        this.handleFlowUpdate(sessionId, message);
        break;
      case 'PAUSE_CHANNEL':
      case 'RESUME_CHANNEL':
        this.handleChannelControl(sessionId, message);
        break;
      default:
        this.handleRegularMessage(sessionId, message);
    }
  }

  private handleHandshake(sessionId: string, message: any) {
    const session = this.sessions.get(sessionId);
    if (!session) return;

    const response = {
      id: uuidv4(),
      session: sessionId,
      seq: session.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: "SYSTEM",
      type: "HAI",
      payload: {
        haip_version: "1.1.2",
        accept_major: [1],
        accept_events: [
          "HAI", "FLOW_UPDATE", "PAUSE_CHANNEL", "RESUME_CHANNEL",
          "MESSAGE_START", "MESSAGE_PART", "MESSAGE_END"
        ],
        flow_control: {
          enabled: true,
          initial_credits: 1000,
          adaptive: true
        }
      }
    };

    session.ws.send(JSON.stringify(response));
  }

  private handleFlowUpdate(sessionId: string, message: any) {
    const session = this.sessions.get(sessionId);
    const flowControl = this.flowControl.get(sessionId);
    if (!session || !flowControl) return;

    const { channel, action, credits } = message.payload;

    switch (action) {
      case 'request_credits':
        const grantedCredits = flowControl.grantCredits(channel, credits);
        this.sendFlowUpdate(sessionId, channel, 'grant_credits', grantedCredits);
        break;

      case 'reduce_credits':
        flowControl.reduceCredits(channel, credits);
        break;
    }
  }

  private handleChannelControl(sessionId: string, message: any) {
    const session = this.sessions.get(sessionId);
    if (!session) return;

    // Echo channel control message back
    const response = {
      id: uuidv4(),
      session: sessionId,
      seq: session.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: "SYSTEM",
      type: message.type,
      payload: message.payload
    };

    session.ws.send(JSON.stringify(response));
  }

  private handleRegularMessage(sessionId: string, message: any) {
    const session = this.sessions.get(sessionId);
    const flowControl = this.flowControl.get(sessionId);
    if (!session || !flowControl) return;

    const { channel } = message;

    // Update flow control metrics
    flowControl.recordMessage(channel);

    // Check for back pressure
    if (flowControl.shouldApplyBackPressure(channel)) {
      this.sendBackPressure(sessionId, channel);
    }

    // Echo message back
    const response = {
      id: uuidv4(),
      session: sessionId,
      seq: session.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: "AGENT",
      type: message.type,
      payload: message.payload
    };

    session.ws.send(JSON.stringify(response));
  }

  private sendFlowUpdate(sessionId: string, channel: string, action: string, credits: number) {
    const session = this.sessions.get(sessionId);
    if (!session) return;

    const flowUpdate = {
      id: uuidv4(),
      session: sessionId,
      seq: session.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: "SYSTEM",
      type: "FLOW_UPDATE",
      payload: {
        channel: channel,
        action: action,
        credits: credits
      }
    };

    session.ws.send(JSON.stringify(flowUpdate));
  }

  private sendBackPressure(sessionId: string, channel: string) {
    const session = this.sessions.get(sessionId);
    if (!session) return;

    const backPressure = {
      id: uuidv4(),
      session: sessionId,
      seq: session.seqCounter++.toString(),
      ts: Date.now().toString(),
      channel: "SYSTEM",
      type: "FLOW_UPDATE",
      payload: {
        channel: channel,
        action: "back_pressure",
        pressure_level: 0.8,
        queue_size: 15
      }
    };

    session.ws.send(JSON.stringify(backPressure));
  }
}

class ServerFlowControl {
  private messageCounts = new Map<string, number>();
  private lastReset = Date.now();

  recordMessage(channel: string) {
    const current = this.messageCounts.get(channel) || 0;
    this.messageCounts.set(channel, current + 1);

    // Reset counters every minute
    if (Date.now() - this.lastReset > 60000) {
      this.messageCounts.clear();
      this.lastReset = Date.now();
    }
  }

  shouldApplyBackPressure(channel: string): boolean {
    const count = this.messageCounts.get(channel) || 0;
    return count > 50; // Apply back pressure if more than 50 messages per minute
  }

  grantCredits(channel: string, requested: number): number {
    // Grant credits based on server capacity
    return Math.min(requested, 1000);
  }

  reduceCredits(channel: string, amount: number) {
    // Handle credit reduction
    console.log(`Reducing credits for channel ${channel} by ${amount}`);
  }
}

// Start server
const server = new HAIPFlowControlServer(8080);
console.log('HAIP Flow Control Server running on port 8080');

Key Features

Credit-based Flow Control

Configurable credit limits for managing message flow and preventing system overload.

Adaptive Credit Adjustment

Dynamic credit adjustment based on real-time performance metrics and system capacity.

Back-pressure Detection

Automatic detection of back-pressure conditions with intelligent channel pausing.

Message Queuing

Intelligent message queuing when credits are insufficient to maintain system stability.

Performance Monitoring

Real-time latency and throughput tracking for optimal flow control decisions.

Channel-specific Control

Independent flow control for different message types and channels.

Automatic Recovery

Seamless recovery and credit restoration when back-pressure conditions subside.

System Protection

Built-in protection against overwhelming the system with excessive message rates.
This example demonstrates how to implement robust flow control for HAI Protocol to prevent overwhelming the system and ensure optimal performance.