Credit-based back-pressure management with HAIP
class HAIPFlowControl {
private ws: WebSocket | null = null;
private sessionId: string;
private seqCounter = 1;
private credits = new Map<string, number>();
private pendingMessages = new Map<string, PendingMessage[]>();
private flowControlConfig: FlowControlConfig;
private isPaused = new Map<string, boolean>();
constructor(
private url: string,
private token: string,
config: Partial<FlowControlConfig> = {}
) {
this.sessionId = this.generateUUID();
this.flowControlConfig = {
initialCredits: 1000,
minCredits: 100,
maxCredits: 10000,
creditThreshold: 0.2,
backPressureThreshold: 0.1,
adaptiveAdjustment: true,
...config
};
this.initializeCredits();
}
private initializeCredits() {
// Initialize credits for each channel
this.credits.set("USER", this.flowControlConfig.initialCredits);
this.credits.set("AGENT", this.flowControlConfig.initialCredits);
this.credits.set("SYSTEM", this.flowControlConfig.initialCredits);
this.credits.set("AUDIO_IN", this.flowControlConfig.initialCredits);
this.credits.set("AUDIO_OUT", this.flowControlConfig.initialCredits);
// Initialize pause states
this.isPaused.set("USER", false);
this.isPaused.set("AGENT", false);
this.isPaused.set("SYSTEM", false);
this.isPaused.set("AUDIO_IN", false);
this.isPaused.set("AUDIO_OUT", false);
}
async connect(): Promise<void> {
const wsUrl = `${this.url}?token=${encodeURIComponent(this.token)}`;
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log('Connected to HAIP flow control');
this.sendHandshake();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
this.ws.onclose = () => {
console.log('Disconnected from HAIP flow control');
};
}
private sendHandshake() {
const handshake = {
id: this.generateUUID(),
session: this.sessionId,
seq: this.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "SYSTEM",
type: "HAI",
payload: {
haip_version: "1.1.2",
accept_major: [1],
accept_events: [
"HAI", "FLOW_UPDATE", "PAUSE_CHANNEL", "RESUME_CHANNEL",
"MESSAGE_START", "MESSAGE_PART", "MESSAGE_END"
],
flow_control: {
enabled: true,
initial_credits: this.flowControlConfig.initialCredits,
adaptive: this.flowControlConfig.adaptiveAdjustment
}
}
};
this.ws!.send(JSON.stringify(handshake));
}
sendMessage(channel: string, messageType: string, payload: any): boolean {
if (this.isPaused.get(channel)) {
console.warn(`Channel ${channel} is paused, queuing message`);
this.queueMessage(channel, messageType, payload);
return false;
}
const currentCredits = this.credits.get(channel) || 0;
const messageSize = this.calculateMessageSize(payload);
if (currentCredits < messageSize) {
console.warn(`Insufficient credits for channel ${channel}: ${currentCredits} < ${messageSize}`);
this.queueMessage(channel, messageType, payload);
this.requestMoreCredits(channel);
return false;
}
// Deduct credits
this.credits.set(channel, currentCredits - messageSize);
// Send message
const message = {
id: this.generateUUID(),
session: this.sessionId,
seq: this.seqCounter++.toString(),
ts: Date.now().toString(),
channel: channel,
type: messageType,
payload: payload
};
this.ws!.send(JSON.stringify(message));
// Check if we need to request more credits
this.checkCreditThreshold(channel);
return true;
}
private calculateMessageSize(payload: any): number {
// Simple size calculation based on JSON string length
const payloadStr = JSON.stringify(payload);
return payloadStr.length;
}
private queueMessage(channel: string, messageType: string, payload: any) {
if (!this.pendingMessages.has(channel)) {
this.pendingMessages.set(channel, []);
}
const pending = this.pendingMessages.get(channel)!;
pending.push({
messageType,
payload,
timestamp: Date.now()
});
console.log(`Queued message for channel ${channel}, queue size: ${pending.length}`);
}
private requestMoreCredits(channel: string) {
const currentCredits = this.credits.get(channel) || 0;
const requestedCredits = Math.min(
this.flowControlConfig.maxCredits - currentCredits,
this.flowControlConfig.initialCredits
);
if (requestedCredits > 0) {
const flowUpdate = {
id: this.generateUUID(),
session: this.sessionId,
seq: this.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "SYSTEM",
type: "FLOW_UPDATE",
payload: {
channel: channel,
action: "request_credits",
credits: requestedCredits,
current_credits: currentCredits
}
};
this.ws!.send(JSON.stringify(flowUpdate));
}
}
private checkCreditThreshold(channel: string) {
const currentCredits = this.credits.get(channel) || 0;
const threshold = this.flowControlConfig.initialCredits * this.flowControlConfig.creditThreshold;
if (currentCredits < threshold) {
this.requestMoreCredits(channel);
}
}
private processPendingMessages(channel: string) {
const pending = this.pendingMessages.get(channel);
if (!pending || pending.length === 0) return;
const currentCredits = this.credits.get(channel) || 0;
const processed: number[] = [];
for (let i = 0; i < pending.length; i++) {
const message = pending[i];
const messageSize = this.calculateMessageSize(message.payload);
if (currentCredits >= messageSize) {
// Send the message
const haipMessage = {
id: this.generateUUID(),
session: this.sessionId,
seq: this.seqCounter++.toString(),
ts: Date.now().toString(),
channel: channel,
type: message.messageType,
payload: message.payload
};
this.ws!.send(JSON.stringify(haipMessage));
// Update credits
this.credits.set(channel, currentCredits - messageSize);
processed.push(i);
console.log(`Processed queued message for channel ${channel}`);
} else {
break;
}
}
// Remove processed messages
for (let i = processed.length - 1; i >= 0; i--) {
pending.splice(processed[i], 1);
}
}
pauseChannel(channel: string) {
this.isPaused.set(channel, true);
const pauseMessage = {
id: this.generateUUID(),
session: this.sessionId,
seq: this.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "SYSTEM",
type: "PAUSE_CHANNEL",
payload: {
channel: channel,
reason: "flow_control"
}
};
this.ws!.send(JSON.stringify(pauseMessage));
console.log(`Paused channel: ${channel}`);
}
resumeChannel(channel: string) {
this.isPaused.set(channel, false);
const resumeMessage = {
id: this.generateUUID(),
session: this.sessionId,
seq: this.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "SYSTEM",
type: "RESUME_CHANNEL",
payload: {
channel: channel
}
};
this.ws!.send(JSON.stringify(resumeMessage));
console.log(`Resumed channel: ${channel}`);
// Process any pending messages
this.processPendingMessages(channel);
}
private handleMessage(message: any) {
switch (message.type) {
case 'HAI':
console.log('Handshake completed');
break;
case 'FLOW_UPDATE':
this.handleFlowUpdate(message.payload);
break;
case 'PAUSE_CHANNEL':
this.handlePauseChannel(message.payload);
break;
case 'RESUME_CHANNEL':
this.handleResumeChannel(message.payload);
break;
}
}
private handleFlowUpdate(payload: any) {
const { channel, action, credits } = payload;
switch (action) {
case 'grant_credits':
const currentCredits = this.credits.get(channel) || 0;
const newCredits = Math.min(
currentCredits + credits,
this.flowControlConfig.maxCredits
);
this.credits.set(channel, newCredits);
console.log(`Granted ${credits} credits to channel ${channel}, total: ${newCredits}`);
// Process pending messages
this.processPendingMessages(channel);
break;
case 'reduce_credits':
const current = this.credits.get(channel) || 0;
const reduced = Math.max(current - credits, this.flowControlConfig.minCredits);
this.credits.set(channel, reduced);
console.log(`Reduced credits for channel ${channel} by ${credits}, total: ${reduced}`);
break;
case 'back_pressure':
this.handleBackPressure(channel, payload);
break;
}
}
private handleBackPressure(channel: string, payload: any) {
const { pressure_level, queue_size } = payload;
console.log(`Back pressure detected on channel ${channel}: level ${pressure_level}, queue ${queue_size}`);
if (pressure_level > this.flowControlConfig.backPressureThreshold) {
this.pauseChannel(channel);
}
}
private handlePauseChannel(payload: any) {
const { channel } = payload;
this.isPaused.set(channel, true);
console.log(`Channel ${channel} paused by server`);
}
private handleResumeChannel(payload: any) {
const { channel } = payload;
this.isPaused.set(channel, false);
console.log(`Channel ${channel} resumed by server`);
this.processPendingMessages(channel);
}
getCredits(channel: string): number {
return this.credits.get(channel) || 0;
}
isChannelPaused(channel: string): boolean {
return this.isPaused.get(channel) || false;
}
getPendingCount(channel: string): number {
const pending = this.pendingMessages.get(channel);
return pending ? pending.length : 0;
}
disconnect() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
private generateUUID(): string {
return crypto.randomUUID();
}
}
interface FlowControlConfig {
initialCredits: number;
minCredits: number;
maxCredits: number;
creditThreshold: number;
backPressureThreshold: number;
adaptiveAdjustment: boolean;
}
interface PendingMessage {
messageType: string;
payload: any;
timestamp: number;
}
class AdaptiveFlowControl extends HAIPFlowControl {
private performanceMetrics = new Map<string, PerformanceMetrics>();
private adjustmentInterval: NodeJS.Timeout | null = null;
constructor(url: string, token: string, config: Partial<FlowControlConfig> = {}) {
super(url, token, { ...config, adaptiveAdjustment: true });
this.startAdaptiveAdjustment();
}
private startAdaptiveAdjustment() {
this.adjustmentInterval = setInterval(() => {
this.adjustFlowControl();
}, 5000); // Adjust every 5 seconds
}
private adjustFlowControl() {
for (const [channel, metrics] of this.performanceMetrics.entries()) {
const currentCredits = this.getCredits(channel);
const pendingCount = this.getPendingCount(channel);
// Calculate performance score
const performanceScore = this.calculatePerformanceScore(metrics);
// Adjust credits based on performance
if (performanceScore > 0.8 && pendingCount === 0) {
// High performance, can increase credits
const increase = Math.floor(currentCredits * 0.1);
this.requestCreditAdjustment(channel, increase);
} else if (performanceScore < 0.3 || pendingCount > 10) {
// Low performance or high queue, reduce credits
const decrease = Math.floor(currentCredits * 0.2);
this.requestCreditAdjustment(channel, -decrease);
}
// Reset metrics for next interval
this.performanceMetrics.set(channel, {
messagesSent: 0,
messagesQueued: 0,
averageLatency: 0,
backPressureEvents: 0
});
}
}
private calculatePerformanceScore(metrics: PerformanceMetrics): number {
const { messagesSent, messagesQueued, averageLatency, backPressureEvents } = metrics;
if (messagesSent === 0) return 0;
const queueRatio = messagesQueued / (messagesSent + messagesQueued);
const latencyScore = Math.max(0, 1 - (averageLatency / 1000)); // Normalize to 1 second
const backPressureScore = Math.max(0, 1 - (backPressureEvents / 10)); // Normalize to 10 events
return (1 - queueRatio) * 0.4 + latencyScore * 0.4 + backPressureScore * 0.2;
}
private requestCreditAdjustment(channel: string, adjustment: number) {
const flowUpdate = {
id: this.generateUUID(),
session: this.sessionId,
seq: this.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "SYSTEM",
type: "FLOW_UPDATE",
payload: {
channel: channel,
action: adjustment > 0 ? "request_credits" : "reduce_credits",
credits: Math.abs(adjustment),
reason: "adaptive_adjustment"
}
};
this.ws!.send(JSON.stringify(flowUpdate));
}
override sendMessage(channel: string, messageType: string, payload: any): boolean {
const startTime = Date.now();
const success = super.sendMessage(channel, messageType, payload);
// Update performance metrics
this.updateMetrics(channel, success, Date.now() - startTime);
return success;
}
private updateMetrics(channel: string, success: boolean, latency: number) {
if (!this.performanceMetrics.has(channel)) {
this.performanceMetrics.set(channel, {
messagesSent: 0,
messagesQueued: 0,
averageLatency: 0,
backPressureEvents: 0
});
}
const metrics = this.performanceMetrics.get(channel)!;
if (success) {
metrics.messagesSent++;
metrics.averageLatency = (metrics.averageLatency + latency) / 2;
} else {
metrics.messagesQueued++;
}
}
recordBackPressure(channel: string) {
const metrics = this.performanceMetrics.get(channel);
if (metrics) {
metrics.backPressureEvents++;
}
}
getPerformanceMetrics(channel: string): PerformanceMetrics | null {
return this.performanceMetrics.get(channel) || null;
}
disconnect() {
if (this.adjustmentInterval) {
clearInterval(this.adjustmentInterval);
this.adjustmentInterval = null;
}
super.disconnect();
}
}
interface PerformanceMetrics {
messagesSent: number;
messagesQueued: number;
averageLatency: number;
backPressureEvents: number;
}
// Create flow control manager
const flowControl = new AdaptiveFlowControl(
"wss://api.haiprotocol.com/haip/websocket",
"your-jwt-token",
{
initialCredits: 2000,
minCredits: 200,
maxCredits: 20000,
creditThreshold: 0.3,
backPressureThreshold: 0.15,
}
);
// Set up event handlers
flowControl.onFlowUpdate = (payload) => {
console.log("Flow update:", payload);
};
flowControl.onBackPressure = (payload) => {
console.log("Back pressure detected:", payload);
flowControl.recordBackPressure(payload.channel);
};
// Connect and use flow control
await flowControl.connect();
// Send messages with flow control
const sendTextMessage = (text: string) => {
const success = flowControl.sendMessage("USER", "MESSAGE_START", {
message_id: crypto.randomUUID(),
author: "USER",
text: text,
});
if (!success) {
console.log("Message queued due to insufficient credits");
}
};
// Send multiple messages to test flow control
for (let i = 0; i < 100; i++) {
sendTextMessage(`Message ${i + 1}`);
// Check flow control status
const credits = flowControl.getCredits("USER");
const pending = flowControl.getPendingCount("USER");
const paused = flowControl.isChannelPaused("USER");
console.log(`Credits: ${credits}, Pending: ${pending}, Paused: ${paused}`);
// Add delay to simulate real usage
await new Promise((resolve) => setTimeout(resolve, 100));
}
// Monitor performance
setInterval(() => {
const metrics = flowControl.getPerformanceMetrics("USER");
if (metrics) {
console.log("Performance metrics:", metrics);
}
}, 10000);
import WebSocket from 'ws';
import { v4 as uuidv4 } from 'uuid';
class HAIPFlowControlServer {
private wss: WebSocket.Server;
private sessions = new Map<string, any>();
private flowControl = new Map<string, ServerFlowControl>();
constructor(port: number) {
this.wss = new WebSocket.Server({ port });
this.setupServer();
}
private setupServer() {
this.wss.on('connection', (ws, req) => {
const sessionId = uuidv4();
this.sessions.set(sessionId, { ws, seqCounter: 1 });
this.flowControl.set(sessionId, new ServerFlowControl());
ws.on('message', (data) => {
const message = JSON.parse(data.toString());
this.handleMessage(sessionId, message);
});
ws.on('close', () => {
this.sessions.delete(sessionId);
this.flowControl.delete(sessionId);
});
});
}
private handleMessage(sessionId: string, message: any) {
switch (message.type) {
case 'HAI':
this.handleHandshake(sessionId, message);
break;
case 'FLOW_UPDATE':
this.handleFlowUpdate(sessionId, message);
break;
case 'PAUSE_CHANNEL':
case 'RESUME_CHANNEL':
this.handleChannelControl(sessionId, message);
break;
default:
this.handleRegularMessage(sessionId, message);
}
}
private handleHandshake(sessionId: string, message: any) {
const session = this.sessions.get(sessionId);
if (!session) return;
const response = {
id: uuidv4(),
session: sessionId,
seq: session.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "SYSTEM",
type: "HAI",
payload: {
haip_version: "1.1.2",
accept_major: [1],
accept_events: [
"HAI", "FLOW_UPDATE", "PAUSE_CHANNEL", "RESUME_CHANNEL",
"MESSAGE_START", "MESSAGE_PART", "MESSAGE_END"
],
flow_control: {
enabled: true,
initial_credits: 1000,
adaptive: true
}
}
};
session.ws.send(JSON.stringify(response));
}
private handleFlowUpdate(sessionId: string, message: any) {
const session = this.sessions.get(sessionId);
const flowControl = this.flowControl.get(sessionId);
if (!session || !flowControl) return;
const { channel, action, credits } = message.payload;
switch (action) {
case 'request_credits':
const grantedCredits = flowControl.grantCredits(channel, credits);
this.sendFlowUpdate(sessionId, channel, 'grant_credits', grantedCredits);
break;
case 'reduce_credits':
flowControl.reduceCredits(channel, credits);
break;
}
}
private handleChannelControl(sessionId: string, message: any) {
const session = this.sessions.get(sessionId);
if (!session) return;
// Echo channel control message back
const response = {
id: uuidv4(),
session: sessionId,
seq: session.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "SYSTEM",
type: message.type,
payload: message.payload
};
session.ws.send(JSON.stringify(response));
}
private handleRegularMessage(sessionId: string, message: any) {
const session = this.sessions.get(sessionId);
const flowControl = this.flowControl.get(sessionId);
if (!session || !flowControl) return;
const { channel } = message;
// Update flow control metrics
flowControl.recordMessage(channel);
// Check for back pressure
if (flowControl.shouldApplyBackPressure(channel)) {
this.sendBackPressure(sessionId, channel);
}
// Echo message back
const response = {
id: uuidv4(),
session: sessionId,
seq: session.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "AGENT",
type: message.type,
payload: message.payload
};
session.ws.send(JSON.stringify(response));
}
private sendFlowUpdate(sessionId: string, channel: string, action: string, credits: number) {
const session = this.sessions.get(sessionId);
if (!session) return;
const flowUpdate = {
id: uuidv4(),
session: sessionId,
seq: session.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "SYSTEM",
type: "FLOW_UPDATE",
payload: {
channel: channel,
action: action,
credits: credits
}
};
session.ws.send(JSON.stringify(flowUpdate));
}
private sendBackPressure(sessionId: string, channel: string) {
const session = this.sessions.get(sessionId);
if (!session) return;
const backPressure = {
id: uuidv4(),
session: sessionId,
seq: session.seqCounter++.toString(),
ts: Date.now().toString(),
channel: "SYSTEM",
type: "FLOW_UPDATE",
payload: {
channel: channel,
action: "back_pressure",
pressure_level: 0.8,
queue_size: 15
}
};
session.ws.send(JSON.stringify(backPressure));
}
}
class ServerFlowControl {
private messageCounts = new Map<string, number>();
private lastReset = Date.now();
recordMessage(channel: string) {
const current = this.messageCounts.get(channel) || 0;
this.messageCounts.set(channel, current + 1);
// Reset counters every minute
if (Date.now() - this.lastReset > 60000) {
this.messageCounts.clear();
this.lastReset = Date.now();
}
}
shouldApplyBackPressure(channel: string): boolean {
const count = this.messageCounts.get(channel) || 0;
return count > 50; // Apply back pressure if more than 50 messages per minute
}
grantCredits(channel: string, requested: number): number {
// Grant credits based on server capacity
return Math.min(requested, 1000);
}
reduceCredits(channel: string, amount: number) {
// Handle credit reduction
console.log(`Reducing credits for channel ${channel} by ${amount}`);
}
}
// Start server
const server = new HAIPFlowControlServer(8080);
console.log('HAIP Flow Control Server running on port 8080');
Was this page helpful?