carbon-tokenization / backend /src /agent /stream-handler.ts
tfrere's picture
tfrere HF Staff
fix(agent): disable response buffering for chat streaming
df28ae4
import { streamText, convertToModelMessages } from "ai";
import type { UIMessage } from "ai";
import { createOpenAICompatible } from "@ai-sdk/openai-compatible";
import type { Request, Response } from "express";
import { extractToken } from "../auth.js";
/**
* `openai/gpt-oss-120b` is HF's trending tool-calling model with a
* strong reputation on the editor's 18-tool agent loop. The `:fastest`
* policy routes it to Cerebras, which we've validated end-to-end
* (multi-turn, with the reasoning-strip below).
*/
export const DEFAULT_MODEL = "openai/gpt-oss-120b";
/**
* Hugging Face Inference Providers exposes an OpenAI-compatible chat
* completions endpoint at `https://router.huggingface.co/v1` that routes
* to a fleet of providers (Cerebras, Together, Fireworks, ...). The
* upside: any HF user token with the `inference-api` scope can call it,
* so a forked Space gets AI features for free as soon as the user logs
* in - no extra API key to wire up.
*
* See https://huggingface.co/docs/inference-providers
*/
const HF_INFERENCE_BASE_URL = "https://router.huggingface.co/v1";
/**
* Resolve the HF token used to authenticate inference calls.
*
* Priority:
* 1. The currently logged-in editor's OAuth token (forwarded from the
* `hf_access_token` cookie). This is the production path on a HF
* Space - no environment secret needed.
* 2. The `HF_TOKEN` env var fallback. Useful for local dev when OAuth
* isn't configured, or as a server-side default when the OAuth
* scope doesn't include `inference-api` yet.
*/
function resolveHfToken(req: Request): string | undefined {
const userToken = extractToken(req.headers.cookie);
if (userToken) return userToken;
const envToken = process.env.HF_TOKEN;
if (envToken) return envToken;
return undefined;
}
function createProvider(apiKey: string) {
return createOpenAICompatible({
name: "huggingface",
baseURL: HF_INFERENCE_BASE_URL,
apiKey,
});
}
/**
* Drop `reasoning` parts from past assistant messages.
*
* Reasoning models (gpt-oss, DeepSeek R1, ...) emit a `reasoning` part
* alongside their text. The AI SDK persists those parts in the UI
* message history, and on the next turn they round-trip back to the
* provider as `reasoning_content` on the assistant message.
*
* Some HF Inference Providers (notably Cerebras for gpt-oss) reject
* that field with a 400 `wrong_api_format`. Since the model doesn't
* need to see its own prior reasoning to keep the conversation going,
* the safest fix is to strip those parts before conversion.
*/
function stripReasoningParts(messages: UIMessage[]): UIMessage[] {
return messages.map((m) => {
if (m.role !== "assistant" || !Array.isArray(m.parts)) return m;
const filtered = m.parts.filter(
(p) => typeof p.type !== "string" || p.type !== "reasoning",
);
if (filtered.length === m.parts.length) return m;
return { ...m, parts: filtered };
});
}
interface StreamChatOptions {
systemPrompt: string;
tools: Parameters<typeof streamText>[0]["tools"];
logPrefix: string;
}
export async function streamChatResponse(
req: Request,
res: Response,
{ systemPrompt, tools, logPrefix }: StreamChatOptions,
) {
try {
const { messages, model } = req.body;
if (!messages || !Array.isArray(messages)) {
res.status(400).json({ error: "messages array is required" });
return;
}
const apiKey = resolveHfToken(req);
if (!apiKey) {
res.status(500).json({
error:
"No Hugging Face token available. Sign in with your HF account " +
"(the OAuth token is used to call Inference Providers) or set " +
"HF_TOKEN in the backend environment.",
});
return;
}
const provider = createProvider(apiKey);
const modelId = model || process.env.HF_INFERENCE_MODEL || DEFAULT_MODEL;
const modelMessages = await convertToModelMessages(stripReasoningParts(messages));
const result = streamText({
model: provider.chatModel(modelId),
system: systemPrompt,
messages: modelMessages,
tools,
});
const webResponse = result.toUIMessageStreamResponse({
onError: (error) => {
console.error(`[${logPrefix}] stream error:`, error);
return error instanceof Error ? error.message : "Stream error";
},
});
// ----- Disable response buffering --------------------------------------
// The AI SDK already streams tokens chunk-by-chunk, but a few layers
// between Node and the browser can re-buffer them into one big payload
// that arrives all at once, killing the "typing" effect the user
// expects:
// - reverse proxies (Nginx on a HF Space) buffer responses unless
// `X-Accel-Buffering: no` is set;
// - some CDNs / loadbalancers gzip the body on the fly, which means
// they hold chunks until the compression window fills up;
// - Node's own socket keeps small writes in TCP buffers until they
// reach the MTU, which is fine on localhost but visible over a
// real network.
// The headers below opt out of all of those. We also flush response
// headers eagerly so the browser commits to streaming mode and starts
// rendering as soon as the first byte arrives.
const headers: Record<string, string> = Object.fromEntries(
webResponse.headers.entries(),
);
headers["X-Accel-Buffering"] = "no";
headers["Cache-Control"] = "no-cache, no-transform";
headers["Content-Encoding"] = "identity";
res.writeHead(webResponse.status, headers);
res.flushHeaders?.();
// Disable Nagle so small chunks ship immediately instead of waiting
// to coalesce into a packet.
res.socket?.setNoDelay?.(true);
const reader = webResponse.body!.getReader();
const pump = async (): Promise<void> => {
const { done, value } = await reader.read();
if (done) {
res.end();
return;
}
res.write(value);
return pump();
};
await pump();
} catch (error: unknown) {
const message =
error instanceof Error ? error.message : "Internal server error";
console.error(`[${logPrefix}] error:`, message);
if (!res.headersSent) {
res.status(500).json({ error: message });
}
}
}