Credit-based flow control, back-pressure management, and performance optimization in the HAIP SDK
import { createHAIPClient } from "haip-sdk";
const client = createHAIPClient({
url: "ws://localhost:8080",
token: "your-jwt-token",
transport: "websocket",
flowControl: {
initialCredits: 10, // Initial message credits per channel
initialCreditBytes: 1024 * 1024, // Initial byte credits (1MB)
maxCredits: 100, // Maximum credits per channel
maxCreditBytes: 10 * 1024 * 1024, // Maximum byte credits (10MB)
},
});
const client = createHAIPClient({
url: "ws://localhost:8080",
token: "your-jwt-token",
transport: "websocket",
flowControl: {
initialCredits: {
USER: 20,
AGENT: 10,
SYSTEM: 5,
},
initialCreditBytes: {
USER: 2 * 1024 * 1024, // 2MB for user messages
AGENT: 5 * 1024 * 1024, // 5MB for agent responses
SYSTEM: 1024 * 1024, // 1MB for system messages
},
maxCredits: {
USER: 50,
AGENT: 25,
SYSTEM: 10,
},
maxCreditBytes: {
USER: 10 * 1024 * 1024,
AGENT: 20 * 1024 * 1024,
SYSTEM: 5 * 1024 * 1024,
},
},
});
// Get current flow control state
const state = client.getConnectionState();
console.log("Flow control state:", {
userCredits: state.credits.get("USER"),
agentCredits: state.credits.get("AGENT"),
systemCredits: state.credits.get("SYSTEM"),
userBytes: state.byteCredits.get("USER"),
agentBytes: state.byteCredits.get("AGENT"),
systemBytes: state.byteCredits.get("SYSTEM"),
});
class FlowControlMonitor {
constructor(private client: any) {
this.setupMonitoring();
}
private setupMonitoring() {
// Monitor flow control updates
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "FLOW_UPDATE") {
this.handleFlowUpdate(message);
}
});
// Periodic state check
setInterval(() => {
this.logFlowState();
}, 5000);
}
private handleFlowUpdate(message: HAIPMessage) {
const { channel, add_messages, add_bytes } = message.payload;
console.log(`Flow update for ${channel}:`, {
messageCredits: add_messages,
byteCredits: add_bytes,
});
}
private logFlowState() {
const state = this.client.getConnectionState();
console.log("Current flow state:", {
user: {
messages: state.credits.get("USER"),
bytes: state.byteCredits.get("USER"),
},
agent: {
messages: state.credits.get("AGENT"),
bytes: state.byteCredits.get("AGENT"),
},
system: {
messages: state.credits.get("SYSTEM"),
bytes: state.byteCredits.get("SYSTEM"),
},
});
}
}
// Usage
const monitor = new FlowControlMonitor(client);
// Request more message credits
await client.sendFlowUpdate("USER", 10); // Request 10 more message credits
// Request more byte credits
await client.sendFlowUpdate("USER", 0, 1024 * 1024); // Request 1MB more byte credits
// Request both
await client.sendFlowUpdate("USER", 10, 1024 * 1024); // Request both message and byte credits
class AutomaticFlowControl {
private thresholds = {
messageThreshold: 5, // Request more when below 5 message credits
byteThreshold: 512 * 1024, // Request more when below 512KB byte credits
};
constructor(private client: any) {
this.setupAutomaticControl();
}
private setupAutomaticControl() {
// Check flow control before sending messages
const originalSendTextMessage = this.client.sendTextMessage.bind(
this.client
);
this.client.sendTextMessage = async (
channel: string,
text: string,
author?: string,
runId?: string
) => {
await this.ensureCredits(channel);
return originalSendTextMessage(channel, text, author, runId);
};
}
private async ensureCredits(channel: string) {
const state = this.client.getConnectionState();
const messageCredits = state.credits.get(channel) || 0;
const byteCredits = state.byteCredits.get(channel) || 0;
// Request more message credits if needed
if (messageCredits < this.thresholds.messageThreshold) {
const needed = Math.max(
10,
this.thresholds.messageThreshold - messageCredits
);
await this.client.sendFlowUpdate(channel, needed);
}
// Request more byte credits if needed
if (byteCredits < this.thresholds.byteThreshold) {
const needed = Math.max(
1024 * 1024,
this.thresholds.byteThreshold - byteCredits
);
await this.client.sendFlowUpdate(channel, 0, needed);
}
}
}
// Usage
const flowControl = new AutomaticFlowControl(client);
// Pause message flow on a channel
await client.pauseChannel("USER");
// Resume message flow on a channel
await client.resumeChannel("USER");
class ChannelManager {
private pausedChannels = new Set<string>();
constructor(private client: any) {
this.setupChannelHandling();
}
private setupChannelHandling() {
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "PAUSE_CHANNEL") {
this.pausedChannels.add(message.payload.channel);
console.log(`Channel ${message.payload.channel} paused`);
} else if (message.type === "RESUME_CHANNEL") {
this.pausedChannels.delete(message.payload.channel);
console.log(`Channel ${message.payload.channel} resumed`);
}
});
}
isChannelPaused(channel: string): boolean {
return this.pausedChannels.has(channel);
}
async sendWithChannelCheck(channel: string, messageFn: () => Promise<void>) {
if (this.isChannelPaused(channel)) {
console.log(`Channel ${channel} is paused, queuing message`);
// Queue the message for later
this.queueMessage(channel, messageFn);
return;
}
await messageFn();
}
private queueMessage(channel: string, messageFn: () => Promise<void>) {
// Implementation for message queuing
// This would store the message and send it when the channel resumes
}
}
// Usage
const channelManager = new ChannelManager(client);
// Send message with channel check
await channelManager.sendWithChannelCheck("USER", async () => {
await client.sendTextMessage("USER", "Hello", "user", runId);
});
class CreditAwareQueue {
private queues = new Map<string, Array<() => Promise<void>>>();
private processing = new Map<string, boolean>();
constructor(private client: any) {
this.setupFlowControlHandling();
}
private setupFlowControlHandling() {
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "FLOW_UPDATE") {
const { channel, add_messages, add_bytes } = message.payload;
if (add_messages > 0 || add_bytes > 0) {
this.processQueue(channel);
}
}
});
}
async enqueue(channel: string, messageFn: () => Promise<void>) {
if (!this.queues.has(channel)) {
this.queues.set(channel, []);
}
this.queues.get(channel)!.push(messageFn);
await this.processQueue(channel);
}
private async processQueue(channel: string) {
if (this.processing.get(channel)) {
return; // Already processing
}
this.processing.set(channel, true);
try {
const queue = this.queues.get(channel) || [];
while (queue.length > 0) {
const state = this.client.getConnectionState();
const messageCredits = state.credits.get(channel) || 0;
const byteCredits = state.byteCredits.get(channel) || 0;
// Check if we have credits to send
if (messageCredits <= 0 || byteCredits <= 0) {
break; // Wait for more credits
}
const messageFn = queue.shift();
if (messageFn) {
try {
await messageFn();
} catch (error) {
console.error(
`Failed to send queued message on ${channel}:`,
error
);
}
}
}
} finally {
this.processing.set(channel, false);
}
}
getQueueLength(channel: string): number {
return this.queues.get(channel)?.length || 0;
}
clearQueue(channel: string) {
this.queues.set(channel, []);
}
}
// Usage
const queue = new CreditAwareQueue(client);
// Queue messages
await queue.enqueue("USER", async () => {
await client.sendTextMessage("USER", "Message 1", "user", runId);
});
await queue.enqueue("USER", async () => {
await client.sendTextMessage("USER", "Message 2", "user", runId);
});
// Check queue status
console.log("User channel queue length:", queue.getQueueLength("USER"));
class ConservativeFlowControl {
constructor(private client: any) {
this.setupConservativeControl();
}
private setupConservativeControl() {
// Request credits early
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "FLOW_UPDATE") {
const { channel, add_messages, add_bytes } = message.payload;
// If we received credits, request more when we're at 50%
if (add_messages > 0) {
setTimeout(async () => {
const state = this.client.getConnectionState();
const currentCredits = state.credits.get(channel) || 0;
if (currentCredits <= add_messages / 2) {
await this.client.sendFlowUpdate(channel, add_messages);
}
}, 1000);
}
}
});
}
}
class AggressiveFlowControl {
constructor(private client: any) {
this.setupAggressiveControl();
}
private setupAggressiveControl() {
// Request maximum credits immediately
this.client.on("connect", async () => {
const channels = ["USER", "AGENT", "SYSTEM"];
for (const channel of channels) {
await this.client.sendFlowUpdate(channel, 100, 10 * 1024 * 1024);
}
});
}
}
class AdaptiveFlowControl {
private messageRates = new Map<string, number>();
private lastRequestTime = new Map<string, number>();
constructor(private client: any) {
this.setupAdaptiveControl();
}
private setupAdaptiveControl() {
// Track message rates
this.client.on("message", (message: HAIPMessage) => {
if (message.type === "MESSAGE_START") {
const channel = message.payload.channel;
const now = Date.now();
const rate = this.messageRates.get(channel) || 0;
// Calculate new rate (exponential moving average)
this.messageRates.set(channel, rate * 0.9 + 0.1);
}
});
// Adaptive credit requests
setInterval(() => {
this.requestAdaptiveCredits();
}, 5000);
}
private async requestAdaptiveCredits() {
const channels = ["USER", "AGENT", "SYSTEM"];
for (const channel of channels) {
const rate = this.messageRates.get(channel) || 0;
const lastRequest = this.lastRequestTime.get(channel) || 0;
const now = Date.now();
// Request more credits if rate is high and we haven't requested recently
if (rate > 0.5 && now - lastRequest > 10000) {
const credits = Math.min(50, Math.floor(rate * 100));
await this.client.sendFlowUpdate(channel, credits);
this.lastRequestTime.set(channel, now);
}
}
}
}
class MockFlowControl {
private credits = new Map<string, number>();
private byteCredits = new Map<string, number>();
constructor() {
// Initialize with default credits
["USER", "AGENT", "SYSTEM"].forEach((channel) => {
this.credits.set(channel, 10);
this.byteCredits.set(channel, 1024 * 1024);
});
}
consumeCredits(
channel: string,
messageCredits: number = 1,
byteCredits: number = 0
): boolean {
const currentCredits = this.credits.get(channel) || 0;
const currentByteCredits = this.byteCredits.get(channel) || 0;
if (currentCredits >= messageCredits && currentByteCredits >= byteCredits) {
this.credits.set(channel, currentCredits - messageCredits);
this.byteCredits.set(channel, currentByteCredits - byteCredits);
return true;
}
return false;
}
addCredits(
channel: string,
messageCredits: number = 0,
byteCredits: number = 0
) {
const currentCredits = this.credits.get(channel) || 0;
const currentByteCredits = this.byteCredits.get(channel) || 0;
this.credits.set(channel, currentCredits + messageCredits);
this.byteCredits.set(channel, currentByteCredits + byteCredits);
}
getCredits(channel: string): { messages: number; bytes: number } {
return {
messages: this.credits.get(channel) || 0,
bytes: this.byteCredits.get(channel) || 0,
};
}
}
// Usage in tests
const mockFlowControl = new MockFlowControl();
// Test credit consumption
const canSend = mockFlowControl.consumeCredits("USER", 1, 100);
expect(canSend).toBe(true);
// Test credit exhaustion
for (let i = 0; i < 10; i++) {
mockFlowControl.consumeCredits("USER", 1);
}
const canSendMore = mockFlowControl.consumeCredits("USER", 1);
expect(canSendMore).toBe(false);
describe("Flow Control", () => {
test("should respect credit limits", async () => {
const client = createHAIPClient({
url: "ws://localhost:8080",
token: "test-token",
transport: "websocket",
flowControl: {
initialCredits: 2,
initialCreditBytes: 1024,
},
});
// Mock flow control state
const mockState = {
credits: new Map([["USER", 2]]),
byteCredits: new Map([["USER", 1024]]),
};
jest.spyOn(client, "getConnectionState").mockReturnValue(mockState);
// Should be able to send 2 messages
await client.sendTextMessage("USER", "Message 1", "user", "run-1");
await client.sendTextMessage("USER", "Message 2", "user", "run-1");
// Third message should fail or be queued
await expect(
client.sendTextMessage("USER", "Message 3", "user", "run-1")
).rejects.toThrow("FLOW_CONTROL");
});
});
class FlowControlMetrics {
private metrics = {
creditRequests: 0,
creditReceived: 0,
messagesBlocked: 0,
averageWaitTime: 0,
totalWaitTime: 0,
};
recordCreditRequest() {
this.metrics.creditRequests++;
}
recordCreditReceived(amount: number) {
this.metrics.creditReceived += amount;
}
recordMessageBlocked(waitTime: number) {
this.metrics.messagesBlocked++;
this.metrics.totalWaitTime += waitTime;
this.metrics.averageWaitTime =
this.metrics.totalWaitTime / this.metrics.messagesBlocked;
}
getMetrics() {
return {
...this.metrics,
creditUtilization:
this.metrics.creditReceived > 0
? (this.metrics.creditRequests / this.metrics.creditReceived) * 100
: 0,
};
}
reset() {
this.metrics = {
creditRequests: 0,
creditReceived: 0,
messagesBlocked: 0,
averageWaitTime: 0,
totalWaitTime: 0,
};
}
}
// Usage
const flowMetrics = new FlowControlMetrics();
// Monitor flow control performance
setInterval(() => {
const metrics = flowMetrics.getMetrics();
console.log("Flow control metrics:", metrics);
}, 10000);
Was this page helpful?