Production primitives
Refresh lock + CAS, pre-emptive refresh, credential chain, Temporal helpers, rate-limit, dedup, enhancements.
The Integrations SDK ships seven composable primitives covering the production-grade OAuth, rate-limit, idempotency, and durability concerns that hand-rolled SDKs typically reinvent. Each lives behind a focused subpath; opt in only where you need them.
OAuth durability
@fabricorg/integrations/oauth — wrapWithPreemptiveRefresh
Wraps any fetch impl. Before every request, checks the account's expires_at and refreshes the token if within bufferSecs of expiring. Combined with KeyStore.acquireRefreshLock, concurrent requests share one refresh exchange — verified by test that 3 parallel calls trigger only 1 refresh.
import { wrapWithPreemptiveRefresh } from '@fabricorg/integrations/oauth';
const fetcher = wrapWithPreemptiveRefresh({
fetcher: fetch,
keyStore: yourKeyStore,
integration: 'github',
tenantId: ctx.tenantId,
oauthConfig: githubOAuthConfig,
bufferSecs: 300,
});KeyStore.acquireRefreshLock + compareAndSetAccountField
Optional methods on the KeyStore interface. The InMemoryKeyStore ships both; Prisma-backed stores would back these with SELECT FOR UPDATE and conditional updates.
acquireRefreshLock(integration, tenantId, timeoutMs?)— returns a release function. Subsequent callers wait until the first releases.compareAndSetAccountField(integration, tenantId, field, expected, next)— atomic write that only commits when the in-store value still equalsexpected. Returnsfalseon conflict; caller re-reads + retries.
Credential chain — resolveCredentialChain
Layered lookup (project → org → user → global). Plugins use this in their keyBuilder so a single token resolution honors the org's OAuth app config, falling back to user-level tokens for personal-use flows, and finally a global default.
import { resolveCredentialChain, defaultCredentialChain } from '@fabricorg/integrations/oauth';
const accessToken = await resolveCredentialChain({
keyStore,
integration: 'github',
field: 'access_token',
chain: defaultCredentialChain({
projectId: ctx.projectId,
orgId: ctx.organizationId,
userId: ctx.userId,
}),
});Project-scoped credentials — fabric.withProject
Sugar over withTenant('project:<id>'). Use when a single org has per-project tokens (e.g. multiple GitHub installations under one organization).
const fabric = createFabric({ multiTenancy: true, plugins: [...] });
const projectFabric = fabric.withProject('repo-rewrite');
await projectFabric.github.api.repos.list({});Durability
@fabricorg/integrations/temporal — defineFabricActivities
Lightweight Temporal-activity wrappers. No @temporalio/* dependency — returns plain async functions you register with your worker.
import { defineFabricActivities, DEFAULT_RETRY_POLICY } from '@fabricorg/integrations/temporal';
import { Worker } from '@temporalio/worker';
const activities = defineFabricActivities({
keyStore,
fabric,
plugins: ['github', 'slack'], // worker-shard allowlist
});
await Worker.create({
workflowsPath: './workflows',
activities,
});Two activities:
refreshOAuthToken({ integration, tenantId, oauthConfig })— schedule as cron / signal for proactive refresh.processWebhookActivity({ request })— durably consume inbound webhook deliveries.
Rate limits
@fabricorg/integrations/rate-limit — applyRateLimit
Wraps any fetcher. On a 429 (or whatever the config declares), waits per Retry-After header (or exponential backoff fallback) and retries.
import { applyRateLimit, GITHUB_RATE_LIMITS } from '@fabricorg/integrations/rate-limit';
const fetcher = applyRateLimit(fetch, GITHUB_RATE_LIMITS);Preset configs ship for GitHub, Slack, Stripe, plus a STANDARD_RATE_LIMITS baseline. Each config declares statusCodes, retryAfterHeaders, resetHeaderFormat (seconds | epoch-seconds | milliseconds | http-date), maxAttempts, and baseBackoffMs/backoffMultiplier/maxBackoffMs. Plus an optional shouldRetry predicate for cases like GitHub's 403 ambiguity (rate-limit vs. permission).
Idempotency
@fabricorg/integrations/events — InMemoryDedupeStore + dedupedHandle
Webhook providers retry on transient failures. Without dedup, side-effects fire twice. The dedup store is an atomic check-and-set:
import { InMemoryDedupeStore, dedupedHandle } from '@fabricorg/integrations/events';
const store = new InMemoryDedupeStore();
const result = await dedupedHandle({
store,
key: payload.event.id, // Stripe: event.id; GitHub: X-GitHub-Delivery; etc.
ttlMs: 3 * 24 * 60 * 60 * 1000,
handler: () => processInternal(payload),
});If the key was already processed, the cached response is returned without re-running handler.
Provider polish
@fabricorg/integrations/enhancements
Opt-in helpers for noise reduction in agent context:
stripSlackMrkdwn(text)— converts*bold*,_italic_,<@U123>,<#C1|name>,<http://x|label>, HTML entities to plain text.stripSlackMrkdwnAsync(text, resolveUser?)— async variant that resolves<@U123>mentions via a callback.makeSlackResolverCache({ fetchUser, fetchChannel, maxEntries, ttlMs })— LRU-with-TTL cache forusers.info/conversations.infolookups. Default 500 entries, 1-hour TTL.normalizeSlackThreadId(ts)— pad micros to 6 digits.truncateSlackContent(text, maxChars=500)— strip + clamp with ellipsis.stripMicrosoftHtml(html)— strip tags + decode entities ( & < > " ' &#NN; &#xHH;).truncateMicrosoftContent(html, maxChars)— strip + clamp.LRUCache<K,V>(maxEntries, ttlMs)— generic primitive used by the resolver cache.
These are helpers, not plugin wrappers. The plugin layer stays thin (Corsair-shape contract); response post-processors opt into the enhancements at call-site.
Putting it together — production GitHub setup
import { createFabric } from '@fabricorg/integrations';
import { github } from '@fabricorg/integrations/plugins';
import {
wrapWithPreemptiveRefresh,
resolveCredentialChain,
defaultCredentialChain,
} from '@fabricorg/integrations/oauth';
import { applyRateLimit, GITHUB_RATE_LIMITS } from '@fabricorg/integrations/rate-limit';
import { dedupedHandle, InMemoryDedupeStore } from '@fabricorg/integrations/events';
import { processWebhook } from '@fabricorg/integrations/webhooks';
const dedupe = new InMemoryDedupeStore();
const githubFetcher = applyRateLimit(
wrapWithPreemptiveRefresh({
fetcher: fetch,
keyStore,
integration: 'github',
tenantId: ctx.tenantId,
oauthConfig: githubOAuthConfig,
bufferSecs: 300,
}),
GITHUB_RATE_LIMITS,
);
const fabric = createFabric({
multiTenancy: true,
plugins: [
github({
token: '', // filled by keyBuilder via credential chain
fetcher: githubFetcher,
}),
],
});
// Webhook receiver:
app.post('/webhooks/github', async (req) => {
const deliveryId = req.headers['x-github-delivery'];
return dedupedHandle({
store: dedupe,
key: deliveryId,
ttlMs: 24 * 60 * 60 * 1000,
handler: () => processWebhook(fabric, req.headers, req.body),
});
});That's the production stack: refresh lock + CAS at the KeyStore tier, pre-emptive refresh + rate-limit on every outbound call, dedup on every inbound webhook.