fix: resolve all critical and high-priority issues from pre-release review

Critical fixes:
- C-01: Extract enforceAccess() into access.ts, use in both registry.execute() and subscribe()
- C-02: Add validateOrThrow() to subscribe() (was missing input validation)
- C-03: Add .catch() handlers to PendingRequestMap.setupSubscriptions() IIFEs
- C-04: MCP adapter now throws CallError on isError: true instead of returning success envelope
- C-05: Replace bare 'EXECUTION_ERROR' string literals with InfrastructureErrorCode.EXECUTION_ERROR enum

High-priority fixes:
- H-01: Remove customAuth stub from AccessControlSchema (was defined but never enforced)
- H-02: Add metadata field to CallRequestedEvent and propagate through buildCallHandler
- H-03: Remove dead stream/pubsub stubs from OperationContext type
- H-04: MCP handler now accepts OperationContext parameter (was discarding it)
- H-05: Use spec.info.version in OpenAPI adapter, config.version in MCP adapter (was hard-coded 1.0.0)
- H-06: FromOpenAPIFile now throws clear error when node:fs/promises unavailable (was unhandled crash)
This commit is contained in:
2026-05-16 10:18:22 +00:00
parent 3e1884cd23
commit a2f64f1dcb
9 changed files with 265 additions and 61 deletions

View File

@@ -0,0 +1,183 @@
# Pre-Release Review: @alkdev/operations
**Date:** 2026-05-16
**Scope:** Full codebase review for issues that would impact downstream hub/spoke implementations
**Status:** C-01 through C-05 and H-01 through H-06 resolved. M-01 through M-08 and L-01 through L-05 remain as follow-ups.
---
## Critical (must fix before downstream use)
### C-01. Duplicated access control logic between registry.execute() and subscribe()
**Files:** `src/registry.ts:107-124`, `src/subscribe.ts:33-51`
`subscribe()` copy-pastes the access control check from `registry.execute()` instead of calling `checkAccess()`. The inline version has slightly different error message formatting and will silently diverge from registry behavior when `customAuth` or any new access control feature is added.
Both paths should call `checkAccess()` directly, and the inline error-throwing logic should be extracted into a shared helper (e.g., `enforceAccess()` or similar).
### C-02. subscribe() skips input validation
**File:** `src/subscribe.ts` (full file)
`registry.execute()` calls `validateOrThrow(spec.inputSchema, input)` before invoking the handler. `subscribe()` does not validate input at all — it checks access then calls the handler directly. Invalid input won't fail until deep in handler code with an unhelpful error.
### C-03. Unhandled promise rejections in PendingRequestMap.setupSubscriptions()
**File:** `src/call.ts:93-113, 115-133, 135-153`
Three async IIFEs `(async () => { ... })()` are started in `setupSubscriptions()` with no `.catch()` handler. If any of these async iterators throw (pubsub error, connection loss, etc.), the promise rejection is unhandled and will crash the process under Node.js `--unhandled-rejections=throw` (the default since Node 15).
Each IIFE should have a `.catch()` that logs the error and optionally re-throws or signals failure.
### C-04. MCP adapter never surfaces isError: true as failure
**File:** `src/from_mcp.ts:130-159`
When an MCP tool call returns `{ isError: true }`, the handler wraps it in a normal `mcpEnvelope` with `meta.isError: true`. This means `registry.execute()` returns what appears to be a successful `ResponseEnvelope`. Downstream consumers must manually check `meta.isError` — easy to miss.
The OpenAPI adapter consistently throws `CallError` on HTTP error statuses (4xx/5xx). The MCP adapter should do the same when `result.isError === true`, wrapping the error content in a `CallError` rather than a successful envelope.
### C-05. from_openapi.ts uses bare string "EXECUTION_ERROR" instead of InfrastructureErrorCode enum
**File:** `src/from_openapi.ts:361, 453`
Uses `"EXECUTION_ERROR"` as a string literal instead of `InfrastructureErrorCode.EXECUTION_ERROR`. If the enum value ever changes or is refactored, these sites will silently break. All other modules use the enum constant.
---
## High (should fix before downstream use)
### H-01. customAuth defined in AccessControlSchema but never enforced
**File:** `src/types.ts:60` defines `customAuth: Type.Optional(Type.String())`
**File:** `src/access.ts``checkAccess()` never references `accessControl.customAuth`
If a hub/spoke author defines `customAuth` on an operation's access control, it is silently ignored. This creates a false sense of security. Either implement the custom auth hook mechanism or remove the field until it can be properly supported.
### H-02. context.metadata not propagated through buildCallHandler
**File:** `src/call.ts:286-290`
The `CallRequestedEvent` schema does not include a `metadata` field. When `buildCallHandler` constructs the `OperationContext`, it only sets `requestId`, `parentRequestId`, and `identity`. Any caller-provided metadata is silently dropped at the call protocol boundary.
The `CallRequestedEvent` type should include an optional `metadata` field, and `buildCallHandler` should propagate it to the context.
### H-03. OperationContext.stream and pubsub are typed but never populated
**File:** `src/types.ts:36-38`
```typescript
export type OperationContext = OperationContextBase & {
env?: OperationEnv
stream?: () => AsyncIterable<unknown>
pubsub?: unknown
}
```
Neither `stream` nor `pubsub` are ever set by the library. `registry.execute()` constructs context without them. Downstream consumers who see these on the type will expect them to work and will get `undefined`.
Either implement these or remove them from the type and add them back when the feature is ready. Leaving dead stubs on the public API surface is worse than not having them.
### H-04. MCP handler discards context parameter entirely
**File:** `src/from_mcp.ts:130`
```typescript
handler: async (input: unknown) => {
```
The handler signature only takes `input`, discarding the `context` parameter. This means identity, request tracing, and metadata cannot flow through to MCP tool calls. The handler should at minimum propagate `context.identity` or a subset of context to the MCP call.
### H-05. Hard-coded version "1.0.0" in both adapters
**File:** `src/from_mcp.ts:123` — All MCP tools get `version: "1.0.0"`
**File:** `src/from_openapi.ts:399, 476` — All OpenAPI operations get `version: "1.0.0"`
Both adapters have access to version information (MCP server info, OpenAPI `info.version`) but ignore it. The MCP adapter should use the server version from `client.getServerVersion()` or accept a configurable default. The OpenAPI adapter should use `spec.info.version`.
### H-06. from_openapi.ts uses import("node:fs/promises") directly — not runtime-agnostic
**File:** `src/from_openapi.ts:518`
`FromOpenAPIFile` does `const { readFile } = await import("node:fs/promises")`. This violates the project constraint of runtime-agnostic code. The `OpenAPIFS` interface exists but `FromOpenAPIFile` bypasses it with a direct Node import. Should inject the file read like `scanner.ts` does with `ScannerFS`, or `FromOpenAPIFile` should accept an `OpenAPIFS` parameter.
---
## Medium (fix in follow-up)
### M-01. Duplicate OperationDefinitionSchema vs OperationSpecSchema
**File:** `src/types.ts:83-101` vs `121-138`
These two schemas are nearly identical, differing only in the `handler` field. Any field added to one must be added to the other. They should be composed (e.g., OperationDefinitionSchema extends OperationSpecSchema + handler).
### M-02. No subpath export for from_openapi
**File:** `package.json` exports only `./from-mcp` and `./from-typemap` as subpath entries. `from_openapi.ts` is bundled into the main entry, meaning anyone importing `PendingRequestMap` or `OperationRegistry` also pulls in the OpenAPI adapter code (including `fetch` usage and `node:fs/promises` import). This should be a separate subpath export for tree-shaking.
### M-03. Subscription deadline semantics are ambiguous
**File:** `src/call.ts:103-109`
When a `call.responded` event arrives for a subscription, the deadline timer is reset to the same `entry.state.deadline` value. If `deadline` is an absolute timestamp (e.g., `Date.now() + 5000`), the new timer delay would be `deadline - Date.now()`, which shrinks over time and could become negative. The documentation doesn't specify whether `deadline` is absolute or relative. This should be clarified and the reset logic adjusted.
### M-04. No unsubscribe/completion signaling in call protocol
**File:** `src/call.ts:298-301`
When a subscription ends (the `for await` loop completes), `buildCallHandler` simply stops calling `callMap.respond()`. There is no `call.completed` or `call.finished` event type to explicitly signal subscription completion. The consumer must detect iterator completion, which works with `Repeater` but may not work with all pubsub implementations.
### M-05. mapError uses fragile message.includes(code) matching
**File:** `src/error.ts:37-41`
Error matching against `errorSchemas` uses `message.includes(schema.code)`, which can match incorrectly (e.g., `"ITEM_NOT_FOUND"` matches `NOT_FOUND`). Should use exact prefix/suffix matching or a structured error code protocol.
### M-06. from_mcp.ts uses multiple `any` casts
**File:** `src/from_mcp.ts:91, 115, 137, 155-156, 173`
Five `any` casts in the MCP adapter. If the MCP SDK types change, these will silently break at runtime. Should use narrow type assertions or import the actual SDK types.
### M-07. No injectable HTTP client for from_openapi
**File:** `src/from_openapi.ts:354, 444, 527`
All HTTP operations use the global `fetch()` API with no way to inject a custom HTTP client. Hub/spoke implementations running in constrained environments (custom proxy, no direct network, test mocking) cannot override the transport. Should accept an optional `fetch` parameter in `HTTPServiceConfig`.
### M-08. No cleanup/disconnect for OpenAPI adapter
Unlike `MCPClientLoader.closeAll()`, there is no cleanup path for OpenAPI HTTP connections. Long-running hub processes that dynamically add/remove OpenAPI services may leak resources.
---
## Low (tech debt, nice-to-fix)
### L-01. subscribe() casts handler return as AsyncGenerator without validation
**File:** `src/subscribe.ts:53`
`handler(input, context) as AsyncGenerator` bypasses runtime type checking. If someone registers a regular async function (not a generator) for a SUBSCRIPTION operation, this fails at iteration time with a confusing error. Should validate at registration time that subscription operations have a `SubscriptionHandler`.
### L-02. isResponseEnvelope type guard is lenient
**File:** `src/response-envelope.ts:132-138`
Checks for `data` and `meta.source` but doesn't validate `meta.operationId`, `meta.timestamp`, or the structure of `data`. A hub checking `isResponseEnvelope()` could pass malformed data through.
### L-03. from_schema.ts FromSchema returns Type.Unknown for unrecognized input
**File:** `src/from_schema.ts:114`
An empty object `{}` silently becomes `Type.Unknown({})`, which validates anything. Should at minimum log a warning.
### L-04. from_openapi.ts SSE GET includes requestBody handling
**File:** `src/from_openapi.ts:329, 341`
GET-based SSE subscriptions include body parameter handling, which is unusual for SSE. This is noted in a comment but not addressed.
### L-05. No convenience registerAll methods on MCP or OpenAPI adapters
`MCPClientLoader` has `getAllOperations()` but no `registerAll(registry)`. `FromOpenAPI` returns a plain array with no helper. Consumers must iterate manually. Minor ergonomics issue.

View File

@@ -1,4 +1,5 @@
import type { AccessControl, Identity } from "./types.js"; import type { AccessControl, Identity } from "./types.js";
import { CallError, InfrastructureErrorCode } from "./error.js";
export function checkAccess(accessControl: AccessControl, identity: Identity): boolean { export function checkAccess(accessControl: AccessControl, identity: Identity): boolean {
const { requiredScopes, requiredScopesAny, resourceType, resourceAction } = accessControl; const { requiredScopes, requiredScopesAny, resourceType, resourceAction } = accessControl;
@@ -25,3 +26,29 @@ export function checkAccess(accessControl: AccessControl, identity: Identity): b
return true; return true;
} }
export function enforceAccess(
accessControl: AccessControl,
identity: Identity | undefined,
operationId: string,
trusted?: boolean,
): void {
if (trusted) return;
if (accessControl.requiredScopes.length > 0 || accessControl.requiredScopesAny?.length || accessControl.resourceType) {
if (!identity) {
throw new CallError(
InfrastructureErrorCode.ACCESS_DENIED,
`Access denied for operation: ${operationId} — identity required`,
{ operationId, requiredScopes: accessControl.requiredScopes },
);
}
if (!checkAccess(accessControl, identity)) {
throw new CallError(
InfrastructureErrorCode.ACCESS_DENIED,
`Access denied for operation: ${operationId}`,
{ requiredScopes: accessControl.requiredScopes },
);
}
}
}

View File

@@ -7,6 +7,9 @@ import { ResponseEnvelopeSchema, isResponseEnvelope } from "./response-envelope.
import type { ResponseEnvelope } from "./response-envelope.js"; import type { ResponseEnvelope } from "./response-envelope.js";
import type { Identity, OperationContext } from "./types.js"; import type { Identity, OperationContext } from "./types.js";
import { OperationType } from "./types.js"; import { OperationType } from "./types.js";
import { getLogger } from "@logtape/logtape";
const logger = getLogger("operations:call");
export const CallEventSchema = { export const CallEventSchema = {
"call.requested": Type.Object({ "call.requested": Type.Object({
@@ -20,6 +23,7 @@ export const CallEventSchema = {
scopes: Type.Array(Type.String()), scopes: Type.Array(Type.String()),
resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))), resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))),
})), })),
metadata: Type.Optional(Type.Record(Type.String(), Type.Unknown())),
}), }),
"call.responded": Type.Object({ "call.responded": Type.Object({
requestId: Type.String(), requestId: Type.String(),
@@ -110,7 +114,9 @@ export class PendingRequestMap {
entry.state.push(responded.output as ResponseEnvelope); entry.state.push(responded.output as ResponseEnvelope);
} }
} }
})(); })().catch((error) => {
logger.error(`call.responded listener error: ${error instanceof Error ? error.message : String(error)}`);
});
const errorIter = this.pubsub.subscribe("call.error", ""); const errorIter = this.pubsub.subscribe("call.error", "");
(async () => { (async () => {
@@ -130,7 +136,9 @@ export class PendingRequestMap {
this.entries.delete(err.requestId); this.entries.delete(err.requestId);
} }
} }
})(); })().catch((error) => {
logger.error(`call.error listener error: ${error instanceof Error ? error.message : String(error)}`);
});
const abortedIter = this.pubsub.subscribe("call.aborted", ""); const abortedIter = this.pubsub.subscribe("call.aborted", "");
(async () => { (async () => {
@@ -150,7 +158,9 @@ export class PendingRequestMap {
this.entries.delete(aborted.requestId); this.entries.delete(aborted.requestId);
} }
} }
})(); })().catch((error) => {
logger.error(`call.aborted listener error: ${error instanceof Error ? error.message : String(error)}`);
});
} }
private startSubscriptionTimer(requestId: string, deadline: number): ReturnType<typeof setTimeout> { private startSubscriptionTimer(requestId: string, deadline: number): ReturnType<typeof setTimeout> {
@@ -281,12 +291,13 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler {
const { registry, callMap } = config; const { registry, callMap } = config;
return async (event: CallRequestedEvent): Promise<void> => { return async (event: CallRequestedEvent): Promise<void> => {
const { requestId, operationId, input, identity } = event; const { requestId, operationId, input, identity, metadata } = event;
const context: OperationContext = { const context: OperationContext = {
requestId, requestId,
parentRequestId: event.parentRequestId, parentRequestId: event.parentRequestId,
identity, identity,
metadata,
}; };
try { try {

View File

@@ -1,4 +1,4 @@
import type { OperationSpec, OperationHandler } from "./types.js"; import type { OperationSpec, OperationHandler, OperationContext } from "./types.js";
import { OperationType } from "./types.js"; import { OperationType } from "./types.js";
import { Kind, Type, type TSchema } from "@alkdev/typebox"; import { Kind, Type, type TSchema } from "@alkdev/typebox";
import { Value } from "@alkdev/typebox/value"; import { Value } from "@alkdev/typebox/value";
@@ -16,6 +16,7 @@ export interface MCPClientConfig {
cwd?: string; cwd?: string;
url?: string; url?: string;
headers?: Record<string, string>; headers?: Record<string, string>;
version?: string;
} }
export interface MCPClientWrapper { export interface MCPClientWrapper {
@@ -120,20 +121,30 @@ export async function createMCPClient(
return { return {
name: tool.name, name: tool.name,
namespace: name, namespace: name,
version: "1.0.0", version: config.version || "1.0.0",
type: OperationType.MUTATION, type: OperationType.MUTATION,
description: tool.description || "", description: tool.description || "",
tags: [], tags: [],
inputSchema: FromSchema(tool.inputSchema) as TSchema, inputSchema: FromSchema(tool.inputSchema) as TSchema,
outputSchema, outputSchema,
accessControl: { requiredScopes: [] }, accessControl: { requiredScopes: [] },
handler: async (input: unknown) => { handler: async (input: unknown, context: OperationContext) => {
logger.debug(`Calling MCP tool: ${name}.${tool.name}`); logger.debug(`Calling MCP tool: ${name}.${tool.name}`, context.identity ? { identity: context.identity.id } : undefined);
const result = await client.callTool({ const result = await client.callTool({
name: tool.name, name: tool.name,
arguments: input as Record<string, unknown>, arguments: input as Record<string, unknown>,
}); });
if (result.isError) {
const contentBlocks = Array.isArray(result.content) ? result.content : [];
const errorContent = mapMCPContentBlocks(contentBlocks);
throw new CallError(
InfrastructureErrorCode.EXECUTION_ERROR,
`MCP tool ${name}.${tool.name} returned error: ${errorContent.map(b => b.type === "text" ? b.text : JSON.stringify(b)).join("; ")}`,
{ toolName: tool.name, serverName: name, isError: true },
);
}
const structuredContent = (result as any).structuredContent as Record<string, unknown> | undefined; const structuredContent = (result as any).structuredContent as Record<string, unknown> | undefined;
const contentBlocks = Array.isArray(result.content) ? result.content : []; const contentBlocks = Array.isArray(result.content) ? result.content : [];
@@ -146,7 +157,7 @@ export async function createMCPClient(
: mapMCPContentBlocks(contentBlocks); : mapMCPContentBlocks(contentBlocks);
const meta: Omit<MCPResponseMeta, "source"> = { const meta: Omit<MCPResponseMeta, "source"> = {
isError: Boolean(result.isError), isError: false,
content: mapMCPContentBlocks(contentBlocks), content: mapMCPContentBlocks(contentBlocks),
}; };
if (structuredContent != null) { if (structuredContent != null) {

View File

@@ -1,7 +1,7 @@
import * as Type from "@alkdev/typebox"; import * as Type from "@alkdev/typebox";
import { FromSchema } from "./from_schema.js"; import { FromSchema } from "./from_schema.js";
import { OperationType, type OperationSpec, type OperationHandler, type SubscriptionHandler, type OperationContext } from "./types.js"; import { OperationType, type OperationSpec, type OperationHandler, type SubscriptionHandler, type OperationContext } from "./types.js";
import { CallError } from "./error.js"; import { CallError, InfrastructureErrorCode } from "./error.js";
import { httpEnvelope } from "./response-envelope.js"; import { httpEnvelope } from "./response-envelope.js";
export interface OpenAPIFS { export interface OpenAPIFS {
@@ -321,6 +321,7 @@ function createHTTPOperation(
): OperationSpec & { handler: HTTPOperationHandler } { ): OperationSpec & { handler: HTTPOperationHandler } {
const operationId = normalizeOperationId(operation, method, path); const operationId = normalizeOperationId(operation, method, path);
const opType = detectOperationType(method, operation); const opType = detectOperationType(method, operation);
const apiVersion = spec.info?.version || "1.0.0";
const authHeaders = getAuthHeaders(config); const authHeaders = getAuthHeaders(config);
const responseHeaders = (): Record<string, string> => ({ ...authHeaders, "Content-Type": "application/json" }); const responseHeaders = (): Record<string, string> => ({ ...authHeaders, "Content-Type": "application/json" });
@@ -358,7 +359,7 @@ function createHTTPOperation(
}); });
if (!response.ok) { if (!response.ok) {
throw new CallError("EXECUTION_ERROR", `HTTP ${response.status}: ${response.statusText}`); throw new CallError(InfrastructureErrorCode.EXECUTION_ERROR, `HTTP ${response.status}: ${response.statusText}`);
} }
const reader = response.body!.getReader(); const reader = response.body!.getReader();
@@ -398,7 +399,7 @@ function createHTTPOperation(
return { return {
name: operationId, name: operationId,
namespace: config.namespace, namespace: config.namespace,
version: "1.0.0", version: apiVersion,
type: opType, type: opType,
description: operation.description || operation.summary || `${method.toUpperCase()} ${path}`, description: operation.description || operation.summary || `${method.toUpperCase()} ${path}`,
tags: operation.tags, tags: operation.tags,
@@ -449,7 +450,7 @@ function createHTTPOperation(
}); });
if (!response.ok) { if (!response.ok) {
throw new CallError("EXECUTION_ERROR", `HTTP ${response.status}: ${response.statusText}`); throw new CallError(InfrastructureErrorCode.EXECUTION_ERROR, `HTTP ${response.status}: ${response.statusText}`);
} }
const contentType = response.headers.get("Content-Type") || ""; const contentType = response.headers.get("Content-Type") || "";
@@ -472,7 +473,7 @@ function createHTTPOperation(
return { return {
name: operationId, name: operationId,
namespace: config.namespace, namespace: config.namespace,
version: "1.0.0", version: apiVersion,
type: opType, type: opType,
description: operation.description || operation.summary || `${method.toUpperCase()} ${path}`, description: operation.description || operation.summary || `${method.toUpperCase()} ${path}`,
tags: operation.tags, tags: operation.tags,
@@ -515,8 +516,15 @@ export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, f
if (fs) { if (fs) {
content = await fs.readFile(path); content = await fs.readFile(path);
} else { } else {
try {
const { readFile } = await import("node:fs/promises"); const { readFile } = await import("node:fs/promises");
content = await readFile(path, "utf-8"); content = await readFile(path, "utf-8");
} catch {
throw new CallError(InfrastructureErrorCode.EXECUTION_ERROR,
"FromOpenAPIFile: no filesystem provider given and node:fs/promises is not available. " +
"Provide an OpenAPIFS implementation via the third argument."
);
}
} }
const spec = JSON.parse(content) as OpenAPISpec; const spec = JSON.parse(content) as OpenAPISpec;
return FromOpenAPI(spec, config); return FromOpenAPI(spec, config);

View File

@@ -14,7 +14,7 @@ export { CallError, InfrastructureErrorCode, mapError } from "./error.js";
export type { CallErrorCode } from "./error.js"; export type { CallErrorCode } from "./error.js";
export { PendingRequestMap, buildCallHandler } from "./call.js"; export { PendingRequestMap, buildCallHandler } from "./call.js";
export type { CallEventMap, CallEventMapValue, CallRequestedEvent, CallRespondedEvent, CallAbortedEvent, CallErrorEvent, CallHandler, CallHandlerConfig } from "./call.js"; export type { CallEventMap, CallEventMapValue, CallRequestedEvent, CallRespondedEvent, CallAbortedEvent, CallErrorEvent, CallHandler, CallHandlerConfig } from "./call.js";
export { checkAccess } from "./access.js"; export { checkAccess, enforceAccess } from "./access.js";
export { subscribe } from "./subscribe.js"; export { subscribe } from "./subscribe.js";
export { defaultAdapter, zodAdapter, valibotAdapter } from "./from_typemap.js"; export { defaultAdapter, zodAdapter, valibotAdapter } from "./from_typemap.js";
export type { SchemaAdapter } from "./from_typemap.js"; export type { SchemaAdapter } from "./from_typemap.js";

View File

@@ -5,7 +5,7 @@ import { KindGuard, type TSchema } from "@alkdev/typebox";
import { assertIsSchema, validateOrThrow, collectErrors, formatValueErrors } from "./validation.js"; import { assertIsSchema, validateOrThrow, collectErrors, formatValueErrors } from "./validation.js";
import { isResponseEnvelope, localEnvelope, type ResponseEnvelope } from "./response-envelope.js"; import { isResponseEnvelope, localEnvelope, type ResponseEnvelope } from "./response-envelope.js";
import { CallError, InfrastructureErrorCode } from "./error.js"; import { CallError, InfrastructureErrorCode } from "./error.js";
import { checkAccess } from "./access.js"; import { checkAccess, enforceAccess } from "./access.js";
import type { SchemaAdapter } from "./from_typemap.js"; import type { SchemaAdapter } from "./from_typemap.js";
import { defaultAdapter } from "./from_typemap.js"; import { defaultAdapter } from "./from_typemap.js";
@@ -121,25 +121,7 @@ export class OperationRegistry {
); );
} }
if (!context.trusted) { enforceAccess(spec.accessControl as AccessControl, context.identity, operationId, context.trusted);
const accessControl: AccessControl = spec.accessControl as AccessControl;
if (accessControl.requiredScopes.length > 0 || accessControl.requiredScopesAny?.length || accessControl.resourceType) {
if (!context.identity) {
throw new CallError(
InfrastructureErrorCode.ACCESS_DENIED,
`Access denied for operation: ${operationId} — identity required`,
{ operationId, requiredScopes: accessControl.requiredScopes },
);
}
if (!checkAccess(accessControl, context.identity)) {
throw new CallError(
InfrastructureErrorCode.ACCESS_DENIED,
`Access denied for operation: ${operationId}`,
{ requiredScopes: accessControl.requiredScopes },
);
}
}
}
validateOrThrow(spec.inputSchema, input, `Input validation failed for ${operationId}`); validateOrThrow(spec.inputSchema, input, `Input validation failed for ${operationId}`);

View File

@@ -1,8 +1,9 @@
import type { OperationContext, AccessControl } from "./types.js"; import type { OperationContext } from "./types.js";
import { OperationRegistry } from "./registry.js"; import { OperationRegistry } from "./registry.js";
import { type ResponseEnvelope, isResponseEnvelope, localEnvelope } from "./response-envelope.js"; import { type ResponseEnvelope, isResponseEnvelope, localEnvelope } from "./response-envelope.js";
import { CallError, InfrastructureErrorCode } from "./error.js"; import { CallError, InfrastructureErrorCode } from "./error.js";
import { checkAccess } from "./access.js"; import { enforceAccess } from "./access.js";
import { validateOrThrow } from "./validation.js";
export async function* subscribe( export async function* subscribe(
registry: OperationRegistry, registry: OperationRegistry,
@@ -30,25 +31,9 @@ export async function* subscribe(
); );
} }
if (!context.trusted) { enforceAccess(spec.accessControl, context.identity, operationId, context.trusted);
const accessControl: AccessControl = spec.accessControl as AccessControl;
if (accessControl.requiredScopes.length > 0 || accessControl.requiredScopesAny?.length || accessControl.resourceType) { validateOrThrow(spec.inputSchema, input, `Input validation failed for ${operationId}`);
if (!context.identity) {
throw new CallError(
InfrastructureErrorCode.ACCESS_DENIED,
`Access denied for operation: ${operationId} — identity required`,
{ operationId, requiredScopes: accessControl.requiredScopes },
);
}
if (!checkAccess(accessControl, context.identity)) {
throw new CallError(
InfrastructureErrorCode.ACCESS_DENIED,
`Access denied for operation: ${operationId}`,
{ requiredScopes: accessControl.requiredScopes },
);
}
}
}
const generator = handler(input, context) as AsyncGenerator<unknown, void, unknown>; const generator = handler(input, context) as AsyncGenerator<unknown, void, unknown>;

View File

@@ -33,8 +33,6 @@ type OperationContextBase = Static<typeof OperationContextSchema>
export type OperationContext = OperationContextBase & { export type OperationContext = OperationContextBase & {
env?: OperationEnv env?: OperationEnv
stream?: () => AsyncIterable<unknown>
pubsub?: unknown
} }
export const ErrorDefinitionSchema = Type.Object({ export const ErrorDefinitionSchema = Type.Object({
@@ -57,7 +55,6 @@ export const AccessControlSchema = Type.Object({
Type.Array(Type.String({description: "Required scopes (at least one must match)"}))), Type.Array(Type.String({description: "Required scopes (at least one must match)"}))),
resourceType: Type.Optional(Type.String({description: "Resource Type e.g., project, tool, data"})), resourceType: Type.Optional(Type.String({description: "Resource Type e.g., project, tool, data"})),
resourceAction: Type.Optional(Type.String({description: "Required action on the resource e.g., read, write, execute"})), resourceAction: Type.Optional(Type.String({description: "Required action on the resource e.g., read, write, execute"})),
customAuth: Type.Optional(Type.String({description: "Name of custom auth function"})),
}); });
export type AccessControl = Static<typeof AccessControlSchema>; export type AccessControl = Static<typeof AccessControlSchema>;