Advanced concepts
Entity cache, hooks, BYO-KMS, permission system, and error taxonomy.
Entity cache (EntityStore)
The EntityStore provides a type-safe, queryable cache over plugin data. When a plugin defines a schema, the runtime creates an entity table and exposes db.{plugin}.{entity}.* operations.
How it works
// HubSpot defines a contacts schema
const HubSpotSchema = {
contacts: z.object({
id: z.string(),
email: z.string(),
firstName: z.string(),
lastName: z.string(),
company: z.string().optional(),
createdAt: z.string(),
}),
};
// The runtime creates:
// db.hubspot.contacts.findMany(...)
// db.hubspot.contacts.findUnique(...)
// db.hubspot.contacts.upsert(...)
// db.hubspot.contacts.delete(...)Query patterns
// Find by ID
const contact = await db.hubspot.contacts.findUnique({
where: { id: "123" },
});
// Search with filter
const contacts = await db.hubspot.contacts.findMany({
where: {
email: { contains: "@example.com" },
company: { not: null },
},
orderBy: { createdAt: "desc" },
take: 50,
});
// Upsert after webhook
await db.hubspot.contacts.upsert({
where: { id: payload.objectId.toString() },
create: { /* ... */ },
update: { /* ... */ },
});Cache invalidation
Entity cache entries are invalidated by webhook hooks:
hubspot({
webhookHooks: {
contacts: {
updated: {
after: async (ctx, result) => {
await db.hubspot.contacts.upsert({
where: { id: result.data.contact.id },
create: result.data.contact,
update: result.data.contact,
});
},
},
deleted: {
after: async (ctx, result) => {
await db.hubspot.contacts.delete({
where: { id: result.data.contact.id },
});
},
},
},
},
});Hooks (before / after)
Hooks run around endpoint execution and webhook processing. They enable cross-cutting concerns: logging, caching, approval workflows, and side effects.
Endpoint hooks
slack({
hooks: {
messages: {
send: {
before: async (ctx, args) => {
// Log the call
await auditLog.record({
userId: ctx.userId,
action: "slack:messages.send",
args,
});
// Modify args
return { ctx, args: { ...args, text: args.text.trim() }, continue: true };
},
after: async (ctx, result) => {
// Cache the result
await cache.set(`slack:msg:${result.ts}`, result, { ex: 300 });
return result;
},
},
},
},
});Webhook hooks
stripe({
webhookHooks: {
charge: {
failed: {
before: async (ctx, payload) => {
// Filter: only process charges > $100
if (payload.data.object.amount < 10000) {
return { ctx, args: payload, continue: false };
}
return { ctx, args: payload, continue: true };
},
after: async (ctx, result) => {
// Start a workflow
await temporal.workflow.start(handleFailedPayment, {
args: [{ charge: result.data.charge }],
});
},
},
},
},
});Hook execution order
before hook → permission check → endpoint execution → after hook
↓ ↓ ↓ ↓
can modify gates on risk returns data can side-effect
or reject level + mode (cache, workflow)BYO-KMS (wrapKeyStoreWithKMS)
The SDK supports delegating encryption to external KMS providers:
import { wrapKeyStoreWithKMS } from "@fabricorg/integrations/kms";
// AWS KMS
const awsKms = wrapKeyStoreWithKMS({
provider: "aws",
keyId: "arn:aws:kms:us-east-1:123456789:key/12345678-1234-1234-1234-123456789012",
region: "us-east-1",
});
// GCP Cloud KMS
const gcpKms = wrapKeyStoreWithKMS({
provider: "gcp",
keyName: "projects/my-project/locations/global/keyRings/my-ring/cryptoKeys/my-key",
});
// Azure Key Vault
const azureKms = wrapKeyStoreWithKMS({
provider: "azure",
vaultUrl: "https://my-vault.vault.azure.net/",
keyName: "my-kek",
});
// Use in plugin config
const fabric = createFabric({
keyStore: awsKms,
plugins: [/* ... */],
});Custom KMS adapter
Implement the KeyStore interface for proprietary KMS:
interface KeyStore {
encrypt(plaintext: string, keyId?: string): Promise<{ ciphertext: string; keyId: string }>;
decrypt(ciphertext: string, keyId: string): Promise<string>;
generateDek(): Promise<{ dek: string; encryptedDek: string }>;
}
const customKms: KeyStore = {
async encrypt(plaintext, keyId) {
const response = await fetch("https://my-kms.example.com/encrypt", {
method: "POST",
body: JSON.stringify({ plaintext, keyId }),
});
const { ciphertext } = await response.json();
return { ciphertext, keyId: keyId ?? "default" };
},
async decrypt(ciphertext, keyId) {
// ...
},
async generateDek() {
// ...
},
};Permission system deep-dive
Risk levels
Every endpoint is classified by risk:
| Level | Description | Default policy (open) | Default policy (cautious) | Default policy (strict) | Default policy (readonly) |
|---|---|---|---|---|---|
read | No mutation | Allow | Allow | Allow | Allow |
write | Creates/updates | Allow | Require approval | Require approval | Deny |
destructive | Deletes, archives | Allow | Require approval | Deny | Deny |
Permission modes
type PermissionMode = "open" | "cautious" | "strict" | "readonly";open: All endpoints allowed. Use for trusted internal tools.cautious: Write/destructive require approval. Use for agentic flows with human oversight.strict: Destructive denied, write requires approval. Use for production with sensitive data.readonly: Only read endpoints allowed. Use for reporting and audit tools.
Overrides
Fine-tune per endpoint:
slack({
permissions: {
mode: "cautious",
overrides: {
"messages.send": "allow", // Skip approval for messages
"messages.delete": "deny", // Never allow deletion
"channels.archive": "allow", // Auto-allow archiving
},
},
});Approval polling
When an endpoint requires approval, the SDK returns an ApprovalRequiredError:
import { ApprovalRequiredError } from "@fabricorg/integrations";
try {
await fabric.slack.api.channels.create({ name: "announcements" });
} catch (err) {
if (err instanceof ApprovalRequiredError) {
const { approvalId, endpoint, args } = err;
// Poll for approval
const approved = await pollApproval(approvalId, { timeout: 300_000 });
if (approved) {
// Retry with approval token
await fabric.slack.api.channels.create(args, {
headers: { "X-Approval-Id": approvalId },
});
}
}
}Approval UI
The approval flow renders in the Fabric portal:
- Agent calls endpoint with
riskLevel: "write" - SDK returns
ApprovalRequiredErrorwithapprovalId - Portal shows approval card with endpoint, args, and risk level
- User approves/denies
- Agent retries with approval token
Error taxonomy
Base error classes
class FabricError extends Error {
code: string;
status: number;
retryable: boolean;
constructor(message: string, opts: { code: string; status: number; retryable?: boolean }) {
super(message);
this.code = opts.code;
this.status = opts.status;
this.retryable = opts.retryable ?? false;
}
}
class PermissionBlockedError extends FabricError {
constructor(endpoint: string, mode: PermissionMode) {
super(
`Endpoint ${endpoint} is blocked in ${mode} mode`,
{ code: "PERMISSION_BLOCKED", status: 403, retryable: false },
);
}
}
class ApprovalRequiredError extends FabricError {
approvalId: string;
endpoint: string;
args: unknown;
constructor(approvalId: string, endpoint: string, args: unknown) {
super(
`Approval required for ${endpoint}`,
{ code: "APPROVAL_REQUIRED", status: 403, retryable: false },
);
this.approvalId = approvalId;
this.endpoint = endpoint;
this.args = args;
}
}
class WebhookSignatureError extends FabricError {
constructor(provider: string) {
super(
`Invalid webhook signature for ${provider}`,
{ code: "WEBHOOK_SIGNATURE_INVALID", status: 401, retryable: false },
);
}
}
class RateLimitError extends FabricError {
retryAfter: number;
constructor(retryAfter: number) {
super(
`Rate limit exceeded. Retry after ${retryAfter}s`,
{ code: "RATE_LIMIT", status: 429, retryable: true },
);
this.retryAfter = retryAfter;
}
}Error handling pattern
import {
FabricError,
PermissionBlockedError,
ApprovalRequiredError,
WebhookSignatureError,
RateLimitError,
} from "@fabricorg/integrations";
async function handleFabricError(err: unknown) {
if (err instanceof ApprovalRequiredError) {
await queueForApproval(err.approvalId, err.endpoint, err.args);
return { status: "pending_approval" };
}
if (err instanceof PermissionBlockedError) {
await notifyAdmin(`Blocked operation: ${err.message}`);
return { status: "blocked", reason: err.code };
}
if (err instanceof WebhookSignatureError) {
await securityAlert(`Invalid webhook: ${err.message}`);
return { status: "rejected" };
}
if (err instanceof RateLimitError) {
await scheduleRetry(err.retryAfter);
return { status: "rate_limited", retryAfter: err.retryAfter };
}
if (err instanceof FabricError) {
return { status: "error", code: err.code, retryable: err.retryable };
}
// Unknown error
throw err;
}