1187 lines
36 KiB
TypeScript
1187 lines
36 KiB
TypeScript
import { describe, it, expect, vi } from "vitest";
|
|
import { PendingRequestMap, buildCallHandler } from "../src/call.js";
|
|
import { CallError, InfrastructureErrorCode } from "../src/error.js";
|
|
import { OperationRegistry } from "../src/registry.js";
|
|
import { Type } from "@alkdev/typebox";
|
|
import { OperationType } from "../src/types.js";
|
|
import type { Identity } from "../src/types.js";
|
|
import { localEnvelope, httpEnvelope, mcpEnvelope, isResponseEnvelope } from "../src/response-envelope.js";
|
|
import type { ResponseEnvelope } from "../src/response-envelope.js";
|
|
|
|
describe("PendingRequestMap", () => {
|
|
it("creates instance without event target", () => {
|
|
const map = new PendingRequestMap();
|
|
expect(map.getPendingCount()).toBe(0);
|
|
});
|
|
|
|
it("creates instance with event target", () => {
|
|
const target = new EventTarget();
|
|
const map = new PendingRequestMap(target);
|
|
expect(map.getPendingCount()).toBe(0);
|
|
});
|
|
|
|
it("call() resolves when respond() is called with a ResponseEnvelope", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const callPromise = map.call("test.op", { value: "hello" });
|
|
|
|
setTimeout(() => {
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.respond(requestId, localEnvelope({ result: "world" }, "test.op"));
|
|
}, 10);
|
|
|
|
const result = await callPromise;
|
|
expect(isResponseEnvelope(result)).toBe(true);
|
|
expect(result.meta.source).toBe("local");
|
|
expect(result.data).toEqual({ result: "world" });
|
|
});
|
|
|
|
it("call() resolves with the full ResponseEnvelope", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const callPromise = map.call("test.op", { value: "hello" });
|
|
|
|
const envelope = httpEnvelope({ status: "ok" }, {
|
|
statusCode: 200,
|
|
headers: { "content-type": "application/json" },
|
|
contentType: "application/json",
|
|
});
|
|
|
|
setTimeout(() => {
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.respond(requestId, envelope);
|
|
}, 10);
|
|
|
|
const result = await callPromise;
|
|
expect(result.meta.source).toBe("http");
|
|
expect(result.data).toEqual({ status: "ok" });
|
|
if (result.meta.source === "http") {
|
|
expect(result.meta.statusCode).toBe(200);
|
|
}
|
|
});
|
|
|
|
it("respond() throws when called with non-envelope value", () => {
|
|
const map = new PendingRequestMap();
|
|
expect(() => map.respond("req-1", { result: "world" } as any)).toThrow("ResponseEnvelope");
|
|
});
|
|
|
|
it("call() rejects when emitError() is called", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const callPromise = map.call("test.op", { value: "hello" });
|
|
|
|
setTimeout(() => {
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.emitError(requestId, "CUSTOM_ERROR", "Something went wrong");
|
|
}, 10);
|
|
|
|
await expect(callPromise).rejects.toThrow("Something went wrong");
|
|
await expect(callPromise).rejects.toBeInstanceOf(CallError);
|
|
});
|
|
|
|
it("abort() rejects the pending call", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const callPromise = map.call("test.op", { value: "hello" });
|
|
|
|
setTimeout(() => {
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.abort(requestId);
|
|
}, 10);
|
|
|
|
await expect(callPromise).rejects.toThrow("was aborted");
|
|
await expect(callPromise).rejects.toBeInstanceOf(CallError);
|
|
});
|
|
|
|
it("call() with deadline times out", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const deadline = Date.now() + 50;
|
|
const callPromise = map.call("test.op", { value: "hello" }, { deadline });
|
|
|
|
await expect(callPromise).rejects.toThrow("timed out");
|
|
await expect(callPromise).rejects.toBeInstanceOf(CallError);
|
|
});
|
|
|
|
it("tracks pending requests", () => {
|
|
const map = new PendingRequestMap();
|
|
map.call("test.op1", {});
|
|
map.call("test.op2", {});
|
|
expect(map.getPendingCount()).toBe(2);
|
|
});
|
|
|
|
it("cleans up after call resolves", async () => {
|
|
const map = new PendingRequestMap();
|
|
const callPromise = map.call("test.op", { value: "hello" });
|
|
expect(map.getPendingCount()).toBe(1);
|
|
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.respond(requestId, localEnvelope({ result: "done" }, "test.op"));
|
|
|
|
await callPromise;
|
|
expect(map.getPendingCount()).toBe(0);
|
|
});
|
|
|
|
it("respond() throws when called with a non-envelope value", () => {
|
|
const map = new PendingRequestMap();
|
|
expect(() => map.respond("r1", { result: "raw" } as any)).toThrow(
|
|
"PendingRequestMap.respond() requires a ResponseEnvelope"
|
|
);
|
|
});
|
|
|
|
it("respond() throws when called with a null value", () => {
|
|
const map = new PendingRequestMap();
|
|
expect(() => map.respond("r1", null as any)).toThrow(
|
|
"PendingRequestMap.respond() requires a ResponseEnvelope"
|
|
);
|
|
});
|
|
|
|
it("respond() throws when called with a string value", () => {
|
|
const map = new PendingRequestMap();
|
|
expect(() => map.respond("r1", "raw string" as any)).toThrow(
|
|
"PendingRequestMap.respond() requires a ResponseEnvelope"
|
|
);
|
|
});
|
|
|
|
it("respond() accepts a localEnvelope", () => {
|
|
const map = new PendingRequestMap();
|
|
const callPromise = map.call("test.op", {});
|
|
const requestId = [...map["entries"].keys()][0];
|
|
|
|
expect(() => map.respond(requestId, localEnvelope("data", "test.op"))).not.toThrow();
|
|
});
|
|
|
|
it("respond() accepts an httpEnvelope", () => {
|
|
const map = new PendingRequestMap();
|
|
const callPromise = map.call("test.op", {});
|
|
const requestId = [...map["entries"].keys()][0];
|
|
|
|
expect(() => map.respond(requestId, httpEnvelope("data", {
|
|
statusCode: 200,
|
|
headers: {},
|
|
contentType: "text/plain",
|
|
}))).not.toThrow();
|
|
});
|
|
|
|
it("respond() accepts an mcpEnvelope", () => {
|
|
const map = new PendingRequestMap();
|
|
const callPromise = map.call("test.op", {});
|
|
const requestId = [...map["entries"].keys()][0];
|
|
|
|
expect(() => map.respond(requestId, mcpEnvelope("data", {
|
|
isError: false,
|
|
content: [],
|
|
}))).not.toThrow();
|
|
});
|
|
|
|
it("call() returns Promise<ResponseEnvelope>", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const callPromise = map.call("test.op", { value: "hello" });
|
|
|
|
setTimeout(() => {
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.respond(requestId, localEnvelope(42, "test.op"));
|
|
}, 10);
|
|
|
|
const result = await callPromise;
|
|
expect(isResponseEnvelope(result)).toBe(true);
|
|
expect(result.data).toBe(42);
|
|
expect(result.meta.source).toBe("local");
|
|
});
|
|
});
|
|
|
|
describe("CallHandler", () => {
|
|
function makeRegistry(ops?: Array<Record<string, unknown>>) {
|
|
const registry = new OperationRegistry();
|
|
registry.register({
|
|
name: "echo",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "echo op",
|
|
inputSchema: Type.Object({ value: Type.String() }),
|
|
outputSchema: Type.Object({ value: Type.String() }),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async (input: unknown) => ({ value: (input as { value: string }).value }),
|
|
});
|
|
registry.register({
|
|
name: "voidOp",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "void op",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async () => undefined,
|
|
});
|
|
registry.register({
|
|
name: "guarded",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "guarded op",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Object({ ok: Type.Boolean() }),
|
|
accessControl: {
|
|
requiredScopes: [],
|
|
resourceType: "project",
|
|
resourceAction: "read",
|
|
},
|
|
handler: async () => ({ ok: true }),
|
|
});
|
|
registry.register({
|
|
name: "open",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "open op",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Object({ ok: Type.Boolean() }),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async () => ({ ok: true }),
|
|
});
|
|
|
|
if (ops) {
|
|
for (const op of ops) {
|
|
registry.register(op as any);
|
|
}
|
|
}
|
|
return registry;
|
|
}
|
|
|
|
it("wraps handler return value in localEnvelope and publishes call.responded", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.echo", { value: "hello" });
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.echo",
|
|
input: { value: "hello" },
|
|
});
|
|
|
|
const result = await callPromise;
|
|
expect(result.meta.source).toBe("local");
|
|
if (result.meta.source === "local") {
|
|
expect(result.meta.operationId).toBe("test.echo");
|
|
expect(typeof result.meta.timestamp).toBe("number");
|
|
}
|
|
expect(result.data).toEqual({ value: "hello" });
|
|
});
|
|
|
|
it("wraps undefined handler return value and publishes call.responded", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.voidOp", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.voidOp",
|
|
input: {},
|
|
});
|
|
|
|
const result = await callPromise;
|
|
expect(result.meta.source).toBe("local");
|
|
if (result.meta.source === "local") {
|
|
expect(result.meta.operationId).toBe("test.voidOp");
|
|
}
|
|
expect(result.data).toBeUndefined();
|
|
});
|
|
|
|
it("passes through a pre-built ResponseEnvelope from handler", async () => {
|
|
const registry = new OperationRegistry();
|
|
const mcpResult = mcpEnvelope({ structured: true }, {
|
|
isError: false,
|
|
content: [{ type: "text" as const, text: "result" }],
|
|
structuredContent: { structured: true },
|
|
});
|
|
registry.register({
|
|
name: "mcpOp",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "mcp op",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async () => mcpResult,
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.mcpOp", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.mcpOp",
|
|
input: {},
|
|
});
|
|
|
|
const result = await callPromise;
|
|
expect(result.meta.source).toBe("mcp");
|
|
expect(result.data).toEqual({ structured: true });
|
|
});
|
|
|
|
it("passes through httpEnvelope from handler", async () => {
|
|
const registry = new OperationRegistry();
|
|
const httpResult = httpEnvelope({ items: [1, 2, 3] }, {
|
|
statusCode: 200,
|
|
headers: { "content-type": "application/json" },
|
|
contentType: "application/json",
|
|
});
|
|
registry.register({
|
|
name: "httpOp",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "http op",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async () => httpResult,
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.httpOp", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.httpOp",
|
|
input: {},
|
|
});
|
|
|
|
const result = await callPromise;
|
|
expect(result.meta.source).toBe("http");
|
|
if (result.meta.source === "http") {
|
|
expect(result.meta.statusCode).toBe(200);
|
|
}
|
|
expect(result.data).toEqual({ items: [1, 2, 3] });
|
|
});
|
|
|
|
it("publishes call.error when handler throws", async () => {
|
|
const registry = new OperationRegistry();
|
|
registry.register({
|
|
name: "throws",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "throws op",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async () => { throw new Error("handler failed"); },
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.throws", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.throws",
|
|
input: {},
|
|
});
|
|
|
|
await expect(callPromise).rejects.toThrow("handler failed");
|
|
});
|
|
|
|
it("publishes call.error with CallError when handler throws CallError", async () => {
|
|
const registry = new OperationRegistry();
|
|
registry.register({
|
|
name: "throwsCallError",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "throws call error",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async () => {
|
|
throw new CallError("CUSTOM_CODE", "custom error message", { detail: "info" });
|
|
},
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.throwsCallError", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.throwsCallError",
|
|
input: {},
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe("CUSTOM_CODE");
|
|
expect((error as CallError).message).toBe("custom error message");
|
|
}
|
|
});
|
|
|
|
it("publishes call.error when operation not found", async () => {
|
|
const registry = new OperationRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.nonexistent", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.nonexistent",
|
|
input: {},
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe(InfrastructureErrorCode.OPERATION_NOT_FOUND);
|
|
}
|
|
});
|
|
|
|
it("publishes call.error when handler not registered", async () => {
|
|
const registry = new OperationRegistry();
|
|
registry.registerSpec({
|
|
name: "noHandler",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "op without handler",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: [] },
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.noHandler", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.noHandler",
|
|
input: {},
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe(InfrastructureErrorCode.OPERATION_NOT_FOUND);
|
|
}
|
|
});
|
|
|
|
it("publishes call.error on access denied", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const identity: Identity = { id: "user1", scopes: [] };
|
|
|
|
const callPromise = callMap.call("test.guarded", {}, { identity });
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.guarded",
|
|
input: {},
|
|
identity,
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED);
|
|
}
|
|
});
|
|
|
|
it("publishes call.error for unknown operations with mapError defaults", async () => {
|
|
const registry = new OperationRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.nonexistent", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.nonexistent",
|
|
input: {},
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe(InfrastructureErrorCode.OPERATION_NOT_FOUND);
|
|
}
|
|
});
|
|
|
|
it("applies Value.Cast normalization via execute()", async () => {
|
|
const registry = new OperationRegistry();
|
|
registry.register({
|
|
name: "defaultsFields",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "op with default fields",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Object({ name: Type.String(), count: Type.Number({ default: 0 }) }),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async () => ({ name: "test" }),
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.defaultsFields", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.defaultsFields",
|
|
input: {},
|
|
});
|
|
|
|
const result = await callPromise;
|
|
expect(result.data).toEqual({ name: "test", count: 0 });
|
|
});
|
|
|
|
it("does not normalize with Value.Cast when outputSchema is Unknown", async () => {
|
|
const registry = new OperationRegistry();
|
|
registry.register({
|
|
name: "unknownOutput",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "op with unknown output",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async () => ({ name: "test", extra: "field" }),
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.unknownOutput", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.unknownOutput",
|
|
input: {},
|
|
});
|
|
|
|
const result = await callPromise;
|
|
expect(result.data).toEqual({ name: "test", extra: "field" });
|
|
});
|
|
|
|
it("maps errors using spec errorSchemas", async () => {
|
|
const registry = new OperationRegistry();
|
|
registry.register({
|
|
name: "customError",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "op with custom errors",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: [] },
|
|
errorSchemas: [
|
|
{ code: "NOT_FOUND", description: "Item not found", schema: Type.Object({ id: Type.String() }) },
|
|
],
|
|
handler: async () => { throw new Error("NOT_FOUND: item 42"); },
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const callPromise = callMap.call("test.customError", {});
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.customError",
|
|
input: {},
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe("NOT_FOUND");
|
|
expect((error as CallError).message).toBe("NOT_FOUND: item 42");
|
|
}
|
|
});
|
|
});
|
|
|
|
describe("checkAccess resource access control", () => {
|
|
function makeRegistry(accessControlOverrides: Record<string, unknown> = {}) {
|
|
const registry = new OperationRegistry();
|
|
registry.register({
|
|
name: "guarded",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "guarded op",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Object({ ok: Type.Boolean() }),
|
|
accessControl: {
|
|
requiredScopes: [],
|
|
resourceType: "project",
|
|
resourceAction: "read",
|
|
...accessControlOverrides,
|
|
},
|
|
handler: async () => ({ ok: true }),
|
|
});
|
|
registry.register({
|
|
name: "open",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.QUERY,
|
|
description: "open op",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Object({ ok: Type.Boolean() }),
|
|
accessControl: {
|
|
requiredScopes: [],
|
|
},
|
|
handler: async () => ({ ok: true }),
|
|
});
|
|
return registry;
|
|
}
|
|
|
|
it("denies access when resourceType/resourceAction are set and identity.resources is undefined", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const identity: Identity = { id: "user1", scopes: [] };
|
|
|
|
const callPromise = callMap.call("test.guarded", {}, { identity });
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.guarded",
|
|
input: {},
|
|
identity,
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED);
|
|
}
|
|
});
|
|
|
|
it("denies access when resourceType/resourceAction are set and identity.resources is empty", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const identity: Identity = { id: "user1", scopes: [], resources: {} };
|
|
|
|
const callPromise = callMap.call("test.guarded", {}, { identity });
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.guarded",
|
|
input: {},
|
|
identity,
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED);
|
|
}
|
|
});
|
|
|
|
it("denies access when identity.resources has no matching resource type", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const identity: Identity = {
|
|
id: "user1",
|
|
scopes: [],
|
|
resources: { "document:abc": ["read"] },
|
|
};
|
|
|
|
const callPromise = callMap.call("test.guarded", {}, { identity });
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.guarded",
|
|
input: {},
|
|
identity,
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED);
|
|
}
|
|
});
|
|
|
|
it("denies access when identity.resources has matching type but wrong action", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const identity: Identity = {
|
|
id: "user1",
|
|
scopes: [],
|
|
resources: { "project:abc": ["write"] },
|
|
};
|
|
|
|
const callPromise = callMap.call("test.guarded", {}, { identity });
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.guarded",
|
|
input: {},
|
|
identity,
|
|
});
|
|
|
|
try {
|
|
await callPromise;
|
|
expect.fail("Should have thrown");
|
|
} catch (error) {
|
|
expect(error).toBeInstanceOf(CallError);
|
|
expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED);
|
|
}
|
|
});
|
|
|
|
it("grants access when identity.resources has matching type and action", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const identity: Identity = {
|
|
id: "user1",
|
|
scopes: [],
|
|
resources: { "project:abc": ["read"] },
|
|
};
|
|
|
|
const callPromise = callMap.call("test.guarded", {}, { identity });
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.guarded",
|
|
input: {},
|
|
identity,
|
|
});
|
|
|
|
const result = await callPromise;
|
|
expect(result.data).toEqual({ ok: true });
|
|
});
|
|
|
|
it("grants access when neither resourceType nor resourceAction are set", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const identity: Identity = { id: "user1", scopes: [] };
|
|
|
|
const callPromise = callMap.call("test.open", {}, { identity });
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.open",
|
|
input: {},
|
|
identity,
|
|
});
|
|
|
|
const result = await callPromise;
|
|
expect(result.data).toEqual({ ok: true });
|
|
});
|
|
|
|
it("grants access when identity.resources matches and identity has no scopes required", async () => {
|
|
const registry = makeRegistry();
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const identity: Identity = {
|
|
id: "user1",
|
|
scopes: ["some:scope"],
|
|
resources: { "project:xyz": ["read", "write"] },
|
|
};
|
|
|
|
const callPromise = callMap.call("test.guarded", {}, { identity });
|
|
|
|
await handler({
|
|
requestId: [...callMap["entries"].keys()][0],
|
|
operationId: "test.guarded",
|
|
input: {},
|
|
identity,
|
|
});
|
|
|
|
const result = await callPromise;
|
|
expect(result.data).toEqual({ ok: true });
|
|
});
|
|
});
|
|
|
|
describe("PendingRequestMap.subscribe()", () => {
|
|
it("yields each envelope from call.responded events", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const subscribeIter = map.subscribe("test.stream", { filter: "all" });
|
|
|
|
const results: ResponseEnvelope[] = [];
|
|
const consumePromise = (async () => {
|
|
for await (const envelope of subscribeIter) {
|
|
results.push(envelope);
|
|
if (results.length === 3) break;
|
|
}
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.respond(requestId, localEnvelope({ event: 1 }, "test.stream"));
|
|
map.respond(requestId, localEnvelope({ event: 2 }, "test.stream"));
|
|
map.respond(requestId, localEnvelope({ event: 3 }, "test.stream"));
|
|
|
|
await consumePromise;
|
|
|
|
expect(results).toHaveLength(3);
|
|
expect(results[0].data).toEqual({ event: 1 });
|
|
expect(results[1].data).toEqual({ event: 2 });
|
|
expect(results[2].data).toEqual({ event: 3 });
|
|
expect(results[0].meta.source).toBe("local");
|
|
});
|
|
|
|
it("publishes call.aborted when consumer stops iterating", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const subscribeIter = map.subscribe("test.stream", {});
|
|
|
|
let abortedReceived = false;
|
|
const abortedIter = map["pubsub"].subscribe("call.aborted", "");
|
|
(async () => {
|
|
for await (const envelope of abortedIter) {
|
|
abortedReceived = true;
|
|
}
|
|
})();
|
|
|
|
const results: ResponseEnvelope[] = [];
|
|
const consumePromise = (async () => {
|
|
for await (const envelope of subscribeIter) {
|
|
results.push(envelope);
|
|
if (results.length === 1) break;
|
|
}
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.respond(requestId, localEnvelope("first", "test.stream"));
|
|
|
|
await consumePromise;
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
|
|
expect(abortedReceived).toBe(true);
|
|
});
|
|
|
|
it("throws CallError when call.error event arrives", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const subscribeIter = map.subscribe("test.failing", {});
|
|
let caughtError: unknown;
|
|
|
|
const consumePromise = (async () => {
|
|
try {
|
|
for await (const _ of subscribeIter) {
|
|
// should not reach
|
|
}
|
|
} catch (error) {
|
|
caughtError = error;
|
|
}
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.emitError(requestId, "CUSTOM_ERROR", "Subscription failed");
|
|
|
|
await consumePromise;
|
|
|
|
expect(caughtError).toBeInstanceOf(CallError);
|
|
expect((caughtError as CallError).code).toBe("CUSTOM_ERROR");
|
|
});
|
|
|
|
it("closes iterator on call.aborted event", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const subscribeIter = map.subscribe("test.stream", {});
|
|
let iterationCompleted = false;
|
|
|
|
const consumePromise = (async () => {
|
|
for await (const _ of subscribeIter) {
|
|
// will receive abort
|
|
}
|
|
iterationCompleted = true;
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
const requestId = [...map["entries"].keys()][0];
|
|
|
|
const entry = map["entries"].get(requestId);
|
|
expect(entry).toBeDefined();
|
|
expect(entry!.type).toBe("subscribe");
|
|
|
|
map["pubsub"].publish("call.aborted", "", { requestId });
|
|
|
|
await consumePromise;
|
|
expect(iterationCompleted).toBe(true);
|
|
});
|
|
|
|
it("times out on idle timeout", async () => {
|
|
const map = new PendingRequestMap();
|
|
const idleTimeout = 80;
|
|
|
|
const subscribeIter = map.subscribe("test.slow", {}, { idleTimeout });
|
|
let caughtError: unknown;
|
|
|
|
const consumePromise = (async () => {
|
|
try {
|
|
for await (const _ of subscribeIter) {
|
|
// should not receive any events
|
|
}
|
|
} catch (error) {
|
|
caughtError = error;
|
|
}
|
|
})();
|
|
|
|
await consumePromise;
|
|
|
|
expect(caughtError).toBeInstanceOf(CallError);
|
|
expect((caughtError as CallError).code).toBe(InfrastructureErrorCode.TIMEOUT);
|
|
});
|
|
|
|
it("resets idle timeout on each envelope", async () => {
|
|
const map = new PendingRequestMap();
|
|
const idleTimeout = 150;
|
|
|
|
const subscribeIter = map.subscribe("test.heartbeat", {}, { idleTimeout });
|
|
|
|
const results: ResponseEnvelope[] = [];
|
|
const consumePromise = (async () => {
|
|
for await (const envelope of subscribeIter) {
|
|
results.push(envelope);
|
|
if (results.length === 3) break;
|
|
}
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
const requestId = [...map["entries"].keys()][0];
|
|
|
|
await new Promise((r) => setTimeout(r, 50));
|
|
map.respond(requestId, localEnvelope("event1", "test.heartbeat"));
|
|
|
|
await new Promise((r) => setTimeout(r, 50));
|
|
map.respond(requestId, localEnvelope("event2", "test.heartbeat"));
|
|
|
|
await new Promise((r) => setTimeout(r, 50));
|
|
map.respond(requestId, localEnvelope("event3", "test.heartbeat"));
|
|
|
|
await consumePromise;
|
|
|
|
expect(results).toHaveLength(3);
|
|
expect(results[0].data).toBe("event1");
|
|
expect(results[1].data).toBe("event2");
|
|
expect(results[2].data).toBe("event3");
|
|
});
|
|
|
|
it("abort() closes subscription iterator", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const subscribeIter = map.subscribe("test.stream", {});
|
|
let iterationCompleted = false;
|
|
|
|
const consumePromise = (async () => {
|
|
for await (const _ of subscribeIter) {
|
|
// will receive abort
|
|
}
|
|
iterationCompleted = true;
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
const requestId = [...map["entries"].keys()][0];
|
|
|
|
map.abort(requestId);
|
|
|
|
await consumePromise;
|
|
expect(iterationCompleted).toBe(true);
|
|
});
|
|
|
|
it("tracks subscribe entries in pending count", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const subscribeIter = map.subscribe("test.stream", {});
|
|
|
|
const consumePromise = (async () => {
|
|
for await (const _ of subscribeIter) {
|
|
break;
|
|
}
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 30));
|
|
expect(map.getPendingCount()).toBe(1);
|
|
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.abort(requestId);
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
|
|
await consumePromise;
|
|
expect(map.getPendingCount()).toBe(0);
|
|
});
|
|
|
|
it("subscribe without idleTimeout never times out", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const subscribeIter = map.subscribe("test.persistent", {});
|
|
let eventCount = 0;
|
|
|
|
const consumePromise = (async () => {
|
|
for await (const _ of subscribeIter) {
|
|
eventCount++;
|
|
if (eventCount >= 1) break;
|
|
}
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 50));
|
|
const requestId = [...map["entries"].keys()][0];
|
|
map.respond(requestId, localEnvelope("event", "test.persistent"));
|
|
|
|
await consumePromise;
|
|
expect(eventCount).toBe(1);
|
|
});
|
|
|
|
it("call() with past deadline times out immediately", async () => {
|
|
const map = new PendingRequestMap();
|
|
|
|
const deadline = Date.now() - 100;
|
|
const callPromise = map.call("test.op", { value: "hello" }, { deadline });
|
|
|
|
await expect(callPromise).rejects.toThrow("timed out");
|
|
await expect(callPromise).rejects.toBeInstanceOf(CallError);
|
|
});
|
|
});
|
|
|
|
describe("CallHandler SUBSCRIPTION dispatch", () => {
|
|
it("dispatches SUBSCRIPTION operations to subscribe()", async () => {
|
|
const registry = new OperationRegistry();
|
|
registry.register({
|
|
name: "events",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.SUBSCRIPTION,
|
|
description: "event stream",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: [] },
|
|
handler: async function* (_input: unknown, _context: unknown) {
|
|
yield "event1";
|
|
yield "event2";
|
|
},
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const subscribeIter = callMap.subscribe("test.events", {});
|
|
const results: ResponseEnvelope[] = [];
|
|
const consumePromise = (async () => {
|
|
for await (const envelope of subscribeIter) {
|
|
results.push(envelope);
|
|
if (results.length === 2) break;
|
|
}
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
const requestId = [...callMap["entries"].keys()][0];
|
|
|
|
await handler({
|
|
requestId,
|
|
operationId: "test.events",
|
|
input: {},
|
|
});
|
|
|
|
await consumePromise;
|
|
|
|
expect(results).toHaveLength(2);
|
|
expect(results[0].data).toBe("event1");
|
|
expect(results[0].meta.source).toBe("local");
|
|
expect(results[1].data).toBe("event2");
|
|
});
|
|
|
|
it("publishes call.error for SUBSCRIPTION access denied", async () => {
|
|
const registry = new OperationRegistry();
|
|
registry.register({
|
|
name: "guarded",
|
|
namespace: "test",
|
|
version: "1.0.0",
|
|
type: OperationType.SUBSCRIPTION,
|
|
description: "guarded sub",
|
|
inputSchema: Type.Object({}),
|
|
outputSchema: Type.Unknown(),
|
|
accessControl: { requiredScopes: ["admin"] },
|
|
handler: async function* (_input: unknown, _context: unknown) {
|
|
yield "secret";
|
|
},
|
|
});
|
|
|
|
const callMap = new PendingRequestMap();
|
|
const handler = buildCallHandler({ registry, callMap });
|
|
|
|
const identity: Identity = { id: "user1", scopes: [] };
|
|
const subscribeIter = callMap.subscribe("test.guarded", {}, { identity });
|
|
let caughtError: unknown;
|
|
|
|
const consumePromise = (async () => {
|
|
try {
|
|
for await (const _ of subscribeIter) {
|
|
// should not reach
|
|
}
|
|
} catch (error) {
|
|
caughtError = error;
|
|
}
|
|
})();
|
|
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
const requestId = [...callMap["entries"].keys()][0];
|
|
|
|
await handler({
|
|
requestId,
|
|
operationId: "test.guarded",
|
|
input: {},
|
|
identity,
|
|
});
|
|
|
|
await consumePromise;
|
|
|
|
expect(caughtError).toBeInstanceOf(CallError);
|
|
expect((caughtError as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED);
|
|
});
|
|
}); |