Data Converters
Background reading: Data Converters in Temporal
Contents:
Default Data Converter
In TypeScript, the default Data Converter supports:
undefined
Uint8Array
- JSON
Custom Data Converter
API doc: DataConverter
To send values that are not JSON-serializable like BigInt
s or Date
s, provide a custom Data Converter to the Client and Worker:
Data Converters have two parts:
PayloadConverter
: sync methods that sometimes run inside the Workflow isolate (and are thus limited)PayloadCodec
: async methods that are run outside the isolate
- TypeScript
- JavaScript
interface DataConverter {
payloadConverterPath?: string;
payloadCodecs?: PayloadCodec[];
}
PayloadConverter
API doc: PayloadConverter
- TypeScript
- JavaScript
interface PayloadConverter {
/**
* Converts a value to a {@link Payload}.
* @param value The value to convert. Example values include the Workflow args sent by the client and the values returned by a Workflow or Activity.
*/
toPayload<T>(value: T): Payload;
/**
* Converts a {@link Payload} back to a value.
*/
fromPayload<T>(payload: Payload): T;
}
Custom implementation
Some example implementations are in the SDK itself:
There's also a sample project that creates an EJSON custom PayloadConverter
: samples-typescript/ejson
It implements PayloadConverterWithEncoding
instead of PayloadConverter
so that it could be used with CompositePayloadConverter
:
ejson/src/ejson-payload-converter.ts
- TypeScript
- JavaScript
import {
EncodingType,
errorMessage,
METADATA_ENCODING_KEY,
Payload,
PayloadConverterWithEncoding,
str,
u8,
} from '@temporalio/common';
import { PayloadConverterError } from '@temporalio/internal-workflow-common';
import EJSON from 'ejson';
/**
* Converts between values and [EJSON](https://docs.meteor.com/api/ejson.html) Payloads.
*/
export class EjsonPayloadConverter implements PayloadConverterWithEncoding {
// Use 'json/plain' so that Payloads are displayed in the UI
public encodingType = 'json/plain' as EncodingType;
public toPayload(value: unknown): Payload | undefined {
if (value === undefined) return undefined;
let ejson;
try {
ejson = EJSON.stringify(value);
} catch (e) {
throw new UnsupportedEjsonTypeError(
`Can't run EJSON.stringify on this value: ${value}. Either convert it (or its properties) to EJSON-serializable values (see https://docs.meteor.com/api/ejson.html ), or create a custom data converter. EJSON.stringify error message: ${errorMessage(
e
)}`,
e as Error
);
}
return {
metadata: {
[METADATA_ENCODING_KEY]: u8('json/plain'),
// Include an additional metadata field to indicate that this is an EJSON payload
format: u8('extended'),
},
data: u8(ejson),
};
}
public fromPayload<T>(content: Payload): T {
return content.data ? EJSON.parse(str(content.data)) : content.data;
}
}
export class UnsupportedEjsonTypeError extends PayloadConverterError {
public readonly name: string = 'UnsupportedJsonTypeError';
constructor(message: string | undefined, public readonly cause?: Error) {
super(message ?? undefined);
}
}
import { errorMessage, METADATA_ENCODING_KEY, str, u8, } from '@temporalio/common';
import { PayloadConverterError } from '@temporalio/internal-workflow-common';
import EJSON from 'ejson';
/**
* Converts between values and [EJSON](https://docs.meteor.com/api/ejson.html) Payloads.
*/
export class EjsonPayloadConverter {
// Use 'json/plain' so that Payloads are displayed in the UI
encodingType = 'json/plain';
toPayload(value) {
if (value === undefined)
return undefined;
let ejson;
try {
ejson = EJSON.stringify(value);
}
catch (e) {
throw new UnsupportedEjsonTypeError(`Can't run EJSON.stringify on this value: ${value}. Either convert it (or its properties) to EJSON-serializable values (see https://docs.meteor.com/api/ejson.html ), or create a custom data converter. EJSON.stringify error message: ${errorMessage(e)}`, e);
}
return {
metadata: {
[METADATA_ENCODING_KEY]: u8('json/plain'),
// Include an additional metadata field to indicate that this is an EJSON payload
format: u8('extended'),
},
data: u8(ejson),
};
}
fromPayload(content) {
return content.data ? EJSON.parse(str(content.data)) : content.data;
}
}
export class UnsupportedEjsonTypeError extends PayloadConverterError {
cause;
name = 'UnsupportedJsonTypeError';
constructor(message, cause) {
super(message ?? undefined);
this.cause = cause;
}
}
Then we instantiate one and export it:
ejson/src/payload-converter.ts
- TypeScript
- JavaScript
import { CompositePayloadConverter, UndefinedPayloadConverter } from '@temporalio/common';
import { EjsonPayloadConverter } from './ejson-payload-converter';
export const payloadConverter = new CompositePayloadConverter(
new UndefinedPayloadConverter(),
new EjsonPayloadConverter()
);
import { CompositePayloadConverter, UndefinedPayloadConverter } from '@temporalio/common';
import { EjsonPayloadConverter } from './ejson-payload-converter';
export const payloadConverter = new CompositePayloadConverter(new UndefinedPayloadConverter(), new EjsonPayloadConverter());
We provide it to the Worker and Client:
- TypeScript
- JavaScript
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'ejson',
dataConverter: { payloadConverterPath: require.resolve('./payload-converter') },
});
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'ejson',
dataConverter: { payloadConverterPath: require.resolve('./payload-converter') },
});
- TypeScript
- JavaScript
const client = new WorkflowClient({
dataConverter: { payloadConverterPath: require.resolve('./payload-converter') },
});
const client = new WorkflowClient({
dataConverter: { payloadConverterPath: require.resolve('./payload-converter') },
});
Then we can use supported data types in arguments:
- TypeScript
- JavaScript
const user: User = {
id: uuid(),
// age: 1000n, BigInt isn't supported
hp: Infinity,
matcher: /.*Stormblessed/,
token: Uint8Array.from([1, 2, 3]),
createdAt: new Date(),
};
const handle = await client.start(example, {
args: [user],
taskQueue: 'ejson',
workflowId: `example-user-${user.id}`,
});
const user = {
id: uuid(),
// age: 1000n, BigInt isn't supported
hp: Infinity,
matcher: /.*Stormblessed/,
token: Uint8Array.from([1, 2, 3]),
createdAt: new Date(),
};
const handle = await client.start(example, {
args: [user],
taskQueue: 'ejson',
workflowId: `example-user-${user.id}`,
});
And they get parsed correctly for the Workflow:
- TypeScript
- JavaScript
import type { Result, User } from './types';
export async function example(user: User): Promise<Result> {
const success =
user.createdAt.getTime() < Date.now() &&
user.hp > 50 &&
user.matcher.test('Kaladin Stormblessed') &&
user.token instanceof Uint8Array;
return { success, at: new Date() };
}
export async function example(user) {
const success = user.createdAt.getTime() < Date.now() &&
user.hp > 50 &&
user.matcher.test('Kaladin Stormblessed') &&
user.token instanceof Uint8Array;
return { success, at: new Date() };
}
Protobufs
To serialize values as Protocol Buffers:
Use
protobufjs
Use runtime-loaded messages (not generated classes) and
MessageClass.create
(notnew MessageClass()
)Generate
json-module.js
with a command like:pbjs -t json-module -w commonjs -o protos/json-module.js protos/*.proto
Patch
json-module.js
:
const { patchProtobufRoot } = require('@temporalio/common/lib/converter/patch-protobuf-root');
const unpatchedRoot = require('./json-module');
module.exports = patchProtobufRoot(unpatchedRoot);
Generate
root.d.ts
with:pbjs -t static-module protos/*.proto | pbts -o protos/root.d.ts -
Create a
DefaultPayloadConverterWithProtobufs
:
protobufs/src/payload-converter.ts
- TypeScript
- JavaScript
import { DefaultPayloadConverterWithProtobufs } from '@temporalio/common/lib/protobufs';
import root from '../protos/root';
export const payloadConverter = new DefaultPayloadConverterWithProtobufs({ protobufRoot: root });
import { DefaultPayloadConverterWithProtobufs } from '@temporalio/common/lib/protobufs';
import root from '../protos/root';
export const payloadConverter = new DefaultPayloadConverterWithProtobufs({ protobufRoot: root });
Alternatively, we can use Protobuf Payload Converters directly, or with other converters. If we know that we only use Protobuf objects, and we want them binary encoded (which saves space over proto3 JSON, but can't be viewed in the Web UI), we could do:
- TypeScript
- JavaScript
import { ProtobufBinaryPayloadConverter } from '@temporalio/common/lib/protobufs';
import root from '../protos/root';
export const payloadConverter = new ProtobufBinaryPayloadConverter(root);
import { ProtobufBinaryPayloadConverter } from '@temporalio/common/lib/protobufs';
import root from '../protos/root';
export const payloadConverter = new ProtobufBinaryPayloadConverter(root);
Similarly, if we wanted binary encoded Protobufs in addition to the other default types, we could do:
- TypeScript
- JavaScript
import {
BinaryPayloadConverter,
CompositePayloadConverter,
JsonPayloadConverter,
UndefinedPayloadConverter,
} from '@temporalio/common';
import { ProtobufBinaryPayloadConverter } from '@temporalio/common/lib/protobufs';
import root from '../protos/root';
export const payloadConverter = new CompositePayloadConverter(
new UndefinedPayloadConverter(),
new BinaryPayloadConverter(),
new ProtobufBinaryPayloadConverter(root),
new JsonPayloadConverter(),
);
import { BinaryPayloadConverter, CompositePayloadConverter, JsonPayloadConverter, UndefinedPayloadConverter, } from '@temporalio/common';
import { ProtobufBinaryPayloadConverter } from '@temporalio/common/lib/protobufs';
import root from '../protos/root';
export const payloadConverter = new CompositePayloadConverter(new UndefinedPayloadConverter(), new BinaryPayloadConverter(), new ProtobufBinaryPayloadConverter(root), new JsonPayloadConverter());
- Provide it to the Worker:
- TypeScript
- JavaScript
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'protobufs',
dataConverter: { payloadConverterPath: require.resolve('./payload-converter') },
});
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'protobufs',
dataConverter: { payloadConverterPath: require.resolve('./payload-converter') },
});
- Provide it to the Client:
- TypeScript
- JavaScript
import { WorkflowClient } from '@temporalio/client';
import { v4 as uuid } from 'uuid';
import { foo, ProtoResult } from '../protos/root';
import { example } from './workflows';
async function run() {
const client = new WorkflowClient({
dataConverter: { payloadConverterPath: require.resolve('./payload-converter') },
});
const handle = await client.start(example, {
args: [foo.bar.ProtoInput.create({ name: 'Proto', age: 2 })],
// can't do:
// args: [new foo.bar.ProtoInput({ name: 'Proto', age: 2 })],
taskQueue: 'protobufs',
workflowId: 'my-business-id-' + uuid(),
});
console.log(`Started workflow ${handle.workflowId}`);
const result: ProtoResult = await handle.result();
console.log(result.toJSON());
}
import { WorkflowClient } from '@temporalio/client';
import { v4 as uuid } from 'uuid';
import { foo } from '../protos/root';
import { example } from './workflows';
async function run() {
const client = new WorkflowClient({
dataConverter: { payloadConverterPath: require.resolve('./payload-converter') },
});
const handle = await client.start(example, {
args: [foo.bar.ProtoInput.create({ name: 'Proto', age: 2 })],
// can't do:
// args: [new foo.bar.ProtoInput({ name: 'Proto', age: 2 })],
taskQueue: 'protobufs',
workflowId: 'my-business-id-' + uuid(),
});
console.log(`Started workflow ${handle.workflowId}`);
const result = await handle.result();
console.log(result.toJSON());
}
- Use protobufs in our Workflows and Activities:
- TypeScript
- JavaScript
import { proxyActivities } from '@temporalio/workflow';
import { foo, ProtoResult } from '../protos/root';
import type * as activities from './activities';
const { protoActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});
export async function example(input: foo.bar.ProtoInput): Promise<ProtoResult> {
const result = await protoActivity(input);
return result;
}
import { proxyActivities } from '@temporalio/workflow';
const { protoActivity } = proxyActivities({
startToCloseTimeout: '1 minute',
});
export async function example(input) {
const result = await protoActivity(input);
return result;
}
- TypeScript
- JavaScript
import { foo, ProtoResult } from '../protos/root';
export async function protoActivity(input: foo.bar.ProtoInput): Promise<ProtoResult> {
return ProtoResult.create({ sentence: `${input.name} is ${input.age} years old.` });
}
import { ProtoResult } from '../protos/root';
export async function protoActivity(input) {
return ProtoResult.create({ sentence: `${input.name} is ${input.age} years old.` });
}
PayloadCodec
API doc: PayloadCodec
The default PayloadCodec
does nothing. To create a custom one, we implement this interface:
- TypeScript
- JavaScript
interface PayloadCodec {
/**
* Encode an array of {@link Payload}s for sending over the wire.
* @param payloads May have length 0.
*/
encode(payloads: Payload[]): Promise<Payload[]>;
/**
* Decode an array of {@link Payload}s received from the wire.
*/
decode(payloads: Payload[]): Promise<Payload[]>;
}
Encryption
Background: Data Converter ➡️ Encryption
Here's an example class that implements the PayloadCodec
interface:
encryption/src/encryption-codec.ts
- TypeScript
- JavaScript
import { webcrypto as crypto } from 'node:crypto';
import { METADATA_ENCODING_KEY, Payload, PayloadCodec, str, u8, ValueError } from '@temporalio/common';
import { temporal } from '@temporalio/proto';
import { decrypt, encrypt } from './crypto';
const ENCODING = 'binary/encrypted';
const METADATA_ENCRYPTION_KEY_ID = 'encryption-key-id';
export class EncryptionCodec implements PayloadCodec {
constructor(protected readonly keys: Map<string, crypto.CryptoKey>, protected readonly defaultKeyId: string) {}
static async create(keyId: string): Promise<EncryptionCodec> {
const keys = new Map<string, crypto.CryptoKey>();
keys.set(keyId, await fetchKey(keyId));
return new this(keys, keyId);
}
async encode(payloads: Payload[]): Promise<Payload[]> {
return Promise.all(
payloads.map(async (payload) => ({
metadata: {
[METADATA_ENCODING_KEY]: u8(ENCODING),
[METADATA_ENCRYPTION_KEY_ID]: u8(this.defaultKeyId),
},
// Encrypt entire payload, preserving metadata
data: await encrypt(
temporal.api.common.v1.Payload.encode(payload).finish(),
this.keys.get(this.defaultKeyId)! // eslint-disable-line @typescript-eslint/no-non-null-assertion
),
}))
);
}
async decode(payloads: Payload[]): Promise<Payload[]> {
return Promise.all(
payloads.map(async (payload) => {
if (!payload.metadata || str(payload.metadata[METADATA_ENCODING_KEY]) !== ENCODING) {
return payload;
}
if (!payload.data) {
throw new ValueError('Payload data is missing');
}
const keyIdBytes = payload.metadata[METADATA_ENCRYPTION_KEY_ID];
if (!keyIdBytes) {
throw new ValueError('Unable to decrypt Payload without encryption key id');
}
const keyId = str(keyIdBytes);
let key = this.keys.get(keyId);
if (!key) {
key = await fetchKey(keyId);
this.keys.set(keyId, key);
}
const decryptedPayloadBytes = await decrypt(payload.data, key);
console.log('Decrypting payload.data:', payload.data);
return temporal.api.common.v1.Payload.decode(decryptedPayloadBytes);
})
);
}
}
async function fetchKey(_keyId: string): Promise<crypto.CryptoKey> {
// In production, fetch key from a key management system (KMS). You may want to memoize requests if you'll be decoding
// Payloads that were encrypted using keys other than defaultKeyId.
const key = Buffer.from('test-key-test-key-test-key-test!');
const cryptoKey = await crypto.subtle.importKey(
'raw',
key,
{
name: 'AES-GCM',
},
true,
['encrypt', 'decrypt']
);
return cryptoKey;
}
import { webcrypto as crypto } from 'node:crypto';
import { METADATA_ENCODING_KEY, str, u8, ValueError } from '@temporalio/common';
import { temporal } from '@temporalio/proto';
import { decrypt, encrypt } from './crypto';
const ENCODING = 'binary/encrypted';
const METADATA_ENCRYPTION_KEY_ID = 'encryption-key-id';
export class EncryptionCodec {
keys;
defaultKeyId;
constructor(keys, defaultKeyId) {
this.keys = keys;
this.defaultKeyId = defaultKeyId;
}
static async create(keyId) {
const keys = new Map();
keys.set(keyId, await fetchKey(keyId));
return new this(keys, keyId);
}
async encode(payloads) {
return Promise.all(payloads.map(async (payload) => ({
metadata: {
[METADATA_ENCODING_KEY]: u8(ENCODING),
[METADATA_ENCRYPTION_KEY_ID]: u8(this.defaultKeyId),
},
// Encrypt entire payload, preserving metadata
data: await encrypt(temporal.api.common.v1.Payload.encode(payload).finish(), this.keys.get(this.defaultKeyId) // eslint-disable-line @typescript-eslint/no-non-null-assertion
),
})));
}
async decode(payloads) {
return Promise.all(payloads.map(async (payload) => {
if (!payload.metadata || str(payload.metadata[METADATA_ENCODING_KEY]) !== ENCODING) {
return payload;
}
if (!payload.data) {
throw new ValueError('Payload data is missing');
}
const keyIdBytes = payload.metadata[METADATA_ENCRYPTION_KEY_ID];
if (!keyIdBytes) {
throw new ValueError('Unable to decrypt Payload without encryption key id');
}
const keyId = str(keyIdBytes);
let key = this.keys.get(keyId);
if (!key) {
key = await fetchKey(keyId);
this.keys.set(keyId, key);
}
const decryptedPayloadBytes = await decrypt(payload.data, key);
console.log('Decrypting payload.data:', payload.data);
return temporal.api.common.v1.Payload.decode(decryptedPayloadBytes);
}));
}
}
async function fetchKey(_keyId) {
// In production, fetch key from a key management system (KMS). You may want to memoize requests if you'll be decoding
// Payloads that were encrypted using keys other than defaultKeyId.
const key = Buffer.from('test-key-test-key-test-key-test!');
const cryptoKey = await crypto.subtle.importKey('raw', key, {
name: 'AES-GCM',
}, true, ['encrypt', 'decrypt']);
return cryptoKey;
}
The encryption and decryption code is in src/crypto.ts
. Since encryption is CPU-intensive, and doing AES with Node's built-in crypto module blocks the main thread, we use @ronomon/crypto-async
, which uses Node's threadpool.
As before, we provide a custom data converter to the Client and Worker:
- TypeScript
- JavaScript
const client = new WorkflowClient({
dataConverter: await getDataConverter(),
});
const handle = await client.start(example, {
args: ['Alice: Private message for Bob.'],
taskQueue: 'encryption',
workflowId: `my-business-id-${uuid()}`,
});
console.log(`Started workflow ${handle.workflowId}`);
console.log(await handle.result());
const client = new WorkflowClient({
dataConverter: await getDataConverter(),
});
const handle = await client.start(example, {
args: ['Alice: Private message for Bob.'],
taskQueue: 'encryption',
workflowId: `my-business-id-${uuid()}`,
});
console.log(`Started workflow ${handle.workflowId}`);
console.log(await handle.result());
- TypeScript
- JavaScript
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'encryption',
dataConverter: await getDataConverter(),
});
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'encryption',
dataConverter: await getDataConverter(),
});
When the Client sends 'Alice: Private message for Bob.'
to the Workflow, it gets encrypted on the Client and decrypted in the Worker. The Workflow receives the decrypted message and appends another message. When it returns that longer string, the string gets encrypted by the Worker and decrypted by the Client.
- TypeScript
- JavaScript
export async function example(message: string): Promise<string> {
return `${message}\nBob: Hi Alice, I'm Workflow Bob.`;
}
export async function example(message) {
return `${message}\nBob: Hi Alice, I'm Workflow Bob.`;
}