Developer's guide - Foundations
The Foundations section of the Temporal Developer's guide covers the minimum set of concepts and implementation details needed to build and run a Temporal Application—that is, all the relevant steps to start a Workflow Execution What is a Workflow Execution? A Temporal Workflow Execution is a durable, scalable, reliable, and reactive function execution. It is the main unit of execution of a Temporal Application. What is an Activity Execution? An Activity Execution is the full chain of Activity Task Executions.
This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.
If you can't find what you are looking for in the Developer's guide, it could be in older docs for SDKs.
In this section you can find the following:
- How to run a dev Cluster
- How to add your SDK
- How to create a Temporal Client
- How to develop a Workflow
- How to develop an Activity
- How to start an Activity Execution
- How to run a Worker Process
- How to start a Workflow Execution
Run a dev Cluster
The following sections list various methods of deploying your Temporal Clusters What is a Temporal Cluster? A Temporal Cluster is the Temporal Server paired with persistence.
- Temporalite: This distribution of Temporal runs as a single process with zero runtime dependencies.
- Docker: Using Docker Compose simplifies developing your Temporal Application.
- Gitpod: One-click deployments are available for Go and TypeScript.
For information on deploying a production environment, see the Temporal Cloud documentation.
Temporalite
Temporalite is a distribution of Temporal that runs as a single process with zero runtime dependencies. It supports persistence to disk and in-memory mode through SQLite.
Prerequisites
Temporalite requires Go 1.18 or later.
Build and start Temporalite
The following steps start and run a Temporal Cluster.
- Build from source.
git clone https://github.com/temporalio/temporalite.git
cd temporalite
go build ./cmd/temporalite - Start Temporalite by using the
start
command.Replacetemporalite start --namespace default
default
with your Namespace Name.
Results: You should have Temporal Cluster running at http://127.0.0.1:7233
and the Temporal Web UI at http://127.0.0.1:8233
.
Docker Compose
Use Docker Compose and Temporal Cluster Docker images to quickly install and run a Temporal Cluster locally while developing Temporal Applications.
Prerequisites
Install Docker and Docker Compose.
Clone the repo and run Docker Compose
The following steps start and run a Temporal Cluster using the default configuration.
- Clone the temporalio/docker-compose repository.
git clone https://github.com/temporalio/docker-compose.git
- Change to the directory for the project.
cd docker-compose
- From your project directory, start your application.
docker-compose up
Results: You should have Temporal Cluster running at http://127.0.0.1:7233
and the Temporal Web UI at http://127.0.0.1:8080
.
To try other configurations (different dependencies and databases), or to try a custom Docker image, follow the temporalio/docker-compose README.
Gitpod
You can run a Temporal Cluster and develop Temporal Applications in your browser using Gitpod.
One-click deployments are available for the temporalio/samples-go repo and the temporalio/samples-typescript repo.
A one-click deployment starts a Temporal Cluster using a Temporal Cluster Docker image, starts a Worker Process, and starts one of the application's sample Workflows.
A one-click deployment can take up to a full minute to get fully up and running. When it is running, you can customize the application samples.
Add your SDK
A Temporal SDK What is a Temporal SDK? A Temporal SDK is a language-specific library that offers APIs to construct and use a Temporal Client to communicate with a Temporal Cluster, develop Workflow Definitions, and develop Worker Programs.
An SDK provides you with the following:
- A Temporal Client to communicate with a Temporal Cluster.
What is a Temporal Cluster?
A Temporal Cluster is the Temporal Server paired with persistence.
- APIs to develop Workflows.
What is a Workflow?
In day-to-day conversations, the term "Workflow" frequently denotes either a Workflow Type, a Workflow Definition, or a Workflow Execution.
- APIs to create and manage Worker Processes.
What is a Worker?
In day-to-day conversations, the term Worker is used to denote both a Worker Program and a Worker Process. Temporal documentation aims to be explicit and differentiate between them.
- APIs to author Activities.
What is an Activity Definition?
An Activity Definition is the code that defines the constraints of an Activity Task Execution.
- Go
- Java
- PHP
- Python
- TypeScript
Add the Temporal Go SDK to your project:
go get go.temporal.io/sdk
Or clone the Go SDK repo to your preferred location:
git clone git@github.com:temporalio/sdk-go.git
Add the Temporal Java SDK to your project as a dependency:
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.17.0</version>
</dependency>
implementation 'io.temporal:temporal-sdk:1.17.0'
Other:
Additional scripts for each SDK version are available here: https://search.maven.org/artifact/io.temporal/temporal-sdk. Select an SDK version to see available scripts.
The Temporal PHP SDK is available as composer package and can be installed using the following command in a root of your project:
composer require temporal/sdk
The Temporal PHP SDK requires the RoadRunner 2.0 application server and supervisor to run Activities and Workflows in a scalable way.
Install RoadRunner manually by downloading its binary from the release page.
Or install RoadRunner through the CLI:
composer require spiral/roadrunner:v2.0 nyholm/psr7
./vendor/bin/rr get-binary
To install the latest version of the Temporal Python package, run the following command.
pip install temporalio
This project requires Node.js 14 or later.
Create a new project
npx @temporalio/create@latest ./your-app
Add to an existing project
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity @temporalio/common
The TypeScript SDK is designed with TypeScript-first developer experience in mind, but it works equally well with JavaScript.
API reference
Each SDK has its own API reference. Select a programming language and follow the link to be taken to that reference page.
- Go
- Java
- PHP
- Python
- TypeScript
The Temporal Go SDK API reference is published on pkg.go.dev.
The Temporal Java SDK API reference is published on javadoc.io.
Content is currently unavailable.
The Temporal Python SDK API reference is published on python.temporal.io.
The Temporal TypeScript SDK API reference is published to typescript.temporal.io.
Code samples
You can find a complete list of executable code samples in Temporal's GitHub repository.
Additionally, several of the Tutorials are backed by a fully executable template application.
- Go
- Java
- PHP
- Python
- TypeScript
- Go Samples repo
- Background Check application: Provides a non-trivial Temporal Application implementation in conjunction with application documentaion.
- Hello world application template in Go: Provides a quick-start development app for users. This sample works in conjunction with the "Hello World!" from scratch tutorial in Go.
- Money transfer application template in Go: Provides a quick-start development app for users. It demonstrates a basic "money transfer" Workflow Definition and works in conjunction with the Run your first app tutorial in Go.
- Subscription-style Workflow Definition in Go: Demonstrates some of the patterns that could be implemented for a subscription-style business process.
- eCommerce application example in Go: Showcases a per-user shopping cart–style Workflow Definition that includes an API for adding and removing items from the cart as well as a web UI. This application sample works in conjunction with the eCommerce in Go tutorial.
- Java samples library
- Hello world application template in Java: Provides a quick-start development app for users. Works in conjunction with the "Hello World!" from scratch tutorial in Java.
- Money transfer application template in Java: Provides a quick-start development app for users. It demonstrates a basic "money transfer" Workflow Definition and works in conjunction with the Run your first app tutorial in Java.
- Subscription-style Workflow Definition in Java: Demonstrates some of the patterns that could be implemented for a subscription-style business process.
Subscription-style Workflow Definition in PHP: Demonstrates some of the patterns that could be implemented for a subscription-style business process.
Use the TypeScript samples library stored on GitHub to demonstrate various capabilities of Temporal.
Where can I find video demos?
Connect to a Cluster
A Temporal Client enables you to communicate with the Temporal Cluster What is a Temporal Cluster? A Temporal Cluster is the Temporal Server paired with persistence.
- Starting Workflow Executions.
- Sending Signals to Workflow Executions.
- Sending Queries to Workflow Executions.
- Getting the results of a Workflow Execution.
- Providing an Activity Task Token.
A Temporal Client cannot be initialized and used inside a Workflow. However, it is acceptable and common to use a Temporal Client inside an Activity to communicate with a Temporal Cluster.
When you are running a Cluster locally (such as Temporalite How to quickly install a Temporal Cluster for testing and local development There are four ways to quickly install and run a Temporal Cluster. How to quickly install a Temporal Cluster for testing and local development There are four ways to quickly install and run a Temporal Cluster.127.0.0.1:7233
).
When you are connecting to a production Cluster (such as Temporal Cloud), you will likely need to provide additional connection and client options that might include, but aren't limited to, the following:
- An address and port number.
- A Namespace Name (like a Temporal Cloud Namespace:
<Namespace_ID>.tmprl.cloud
). - mTLS CA certificate.
- mTLS private key.
For more information about managing and generating client certificates for Temporal Cloud, see How to manage certificates in Temporal Cloud.
For more information about configuring TLS to secure inter and intra network communication for a Temporal Cluster, see Temporal Customization Samples.
- Go
- Java
- PHP
- Python
- TypeScript
Use the Dial()
API available in the go.temporal.io/sdk/client
package to create a new Client
.
If you don't provide HostPort
, the Client defaults the address and port number to 127.0.0.1:7233
.
Set a custom Namespace name in the Namespace field on an instance of the Client Options.
Use the ConnectionOptions
API to connect a Client with mTLS.
import (
// ...
"go.temporal.io/sdk/client"
)
func main() {
cert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath)
if err != nil {
return err
}
client, err := client.Dial(client.Options{
HostPort: "your-custom-namespace.tmprl.cloud:7233",
Namespace: "your-custom-namespace",
ConnectionOptions: client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
},
}
defer temporalClient.Close()
// ...
}
To initialize a Workflow Client, create an instance of a WorkflowClient
, create a client-side WorkflowStub
, and then call a Workflow method (annotated with @WorkflowMethod
).
To start a Workflow Execution, your Temporal Server must be running, and your front-end service must be accepting gRPC calls.
To establish a connection with the front-end service, use WorkflowServiceStubs
.
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
You can provide WorkflowServiceStubsOptions
to override the default values for the gRPC calls.
For example, the default front-end service gRPC address is set to 127.0.0.1:7233
, where 7233
is the default port for the Temporal Frontend Service.
If your server is running on a different host or port from the default, you can set it as shown in the following example.
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder()
.setTarget(TARGET_ENDPOINT)
.build());
After the connection to the Temporal Frontend Service is established, create a Client for the service stub. The Workflow Client helps with client-side APIs and is required by Workers.
Create an instance of a WorkflowClient
for the Workflow service stub, and use WorkflowClientOptions
to set options for the Workflow Client.
The following example shows how to create a WorkflowClient
instance called "client" for the WorkflowServiceStubs
"service" that we created in the previous example, and set Namespace
option for the WorkflowClient
.
WorkflowClient client = WorkflowClient.newInstance(
service,
WorkflowClientOptions.newBuilder()
.setNamespace(“Abc”)
.build());
For more information, see WorkflowClientOptions.
WorkflowService
and WorkflowClient
creation is a heavyweight operation, and will be resource-intensive if created each time you start a Workflow or send a Signal to it.
The recommended way is to create them once and reuse where possible.
With the Client defined, you can start interacting with the Temporal Frontend Service.
To initialize a Workflow in the Client, create a WorkflowStub
, and start the Workflow Execution with WorkflowClient.start()
.
Starting Workflows or sending Signals or Queries to Workflows from within a Client must be done using WorkflowStubs
.
WorkflowClient workflowClient = WorkflowClient.newInstance(service, clientOptions);
// Create a Workflow stub.
YourWorkflow workflow = workflowClient.newWorkflowStub(YourWorkflow.class);
// Start Workflow asynchronously and call its "yourWFMethod" Workflow method
WorkflowClient.start(workflow::yourWFMethod);
For more information, see the following:
Create an instance of the $workflowClient
class and use the create()
method to connect a Temporal Client to a Temporal Cluster.
Specify the target host, localhost:7223
, parameter as a string and provide the TLS configuration for connecting to a Temporal Cluster.
use Temporal\Client\GRPC\ServiceClient;
use Temporal\Client\WorkflowOptions;
# . . .
$workflowClient = Temporal\Client\WorkflowClient::create(
ServiceClient::createSSL(
'localhost:7233',
'certs/ca.cert',
'certs/client.key',
'certs/client.pem',
'tls-sample',
),
);
To provide the client options as an environmental variable, add the tls
option to the RoadRunner configuration file and pass the path to the file.
temporal:
# . . .
tls:
key: "certs/client.key"
cert: "certs/client.pem"
root_ca: "certs/ca.cert"
client_auth_type: require_and_verify_client_cert
server_name: "tls-sample"
Then update your application and use the SSL connection for ServiceClient
.
$workflowClient = Temporal\Client\WorkflowClient::create(
ServiceClient::createSSL(
'localhost:7233',
getenv('TEMPORAL_SERVER_ROOT_CA_CERT_PATH'),
getenv('TEMPORAL_CLIENT_KEY_PATH'),
getenv('TEMPORAL_CLIENT_CERT_PATH'),
getenv('TEMPORAL_SERVER_NAME_OVERRIDE')
),
);
Use the connect()
method on the Client
class to create and connect to a Temporal Client to the Temporal Cluster.
Specify the target_host
parameter as a string and provide the tls
configuration for connecting to a Temporal Cluster.
client = await Client.connect(
# target_host for the Temporal Cloud
"your-custom-namespace.tmprl.cloud:7233",
# target_host for Temporalite
# "127.0.0.1:7233"
namespace="your-custom-namespace",
tls=TLSConfig(
client_cert=client_cert,
client_private_key=client_private_key,
# domain=domain
# server_root_ca_cert=server_root_ca_cert,
),
)
Declaring the WorkflowClient()
creates a new connection to the Temporal service.
If you omit the connection and just call the new WorkflowClient()
, you create a default connection that works locally.
However, always configure your connection and Namespace when deploying to production.
Use the connectionOptions
API available in the WorkflowClient
package to create a new client
to communicate with a Temporal Cluster.
Use a new WorkflowClient()
with the requisite gRPC Connection
to connect to a Client and set your Namespace name.
Use the connectionOptions
API to connect a Client with mTLS.
import fs from "fs-extra";
import {Connection, WorkflowClient} from "@temporalio/client";
import path = from "path";
async function run() {
const cert = await fs.readFile("./path-to/your.pem");
const key = await fs.readFile("./path-to/your.key");
const connectionOptions = {
address: "your-custom-namespace.tmprl.cloud:7233",
tls: {
clientCertPair: {
crt: cert,
key: key,
},
// serverRootCACertificatePath: "ca.cert",
},
};
const connection = await Connection.connect(connectionOptions);
const client = new WorkflowClient({
connection,
// connects to 'default' namespace if not specified
namespace: "your-custom-namespace",
});
// . . .
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
Develop Workflows
Workflows are the fundamental unit of a Temporal Application, and it all starts with the development of a Workflow Definition What is a Workflow Definition? A Workflow Definition is the code that defines the constraints of a Workflow Execution.
- Go
- Java
- PHP
- Python
- TypeScript
In the Temporal Go SDK programming model, a Workflow Definition What is a Workflow Definition? A Workflow Definition is the code that defines the constraints of a Workflow Execution.
func YourWorkflowDefinition(ctx workflow.Context) error {
// ...
return nil
}
In Go, by default, the Workflow Type name is the same as the function name.
In the Temporal Java SDK programming model, a Workflow definition comprises a Workflow interface annotated with @WorkflowInterface
and a Workflow implementation that implements the Workflow interface.
The Workflow interface is a Java interface and is annotated with @WorkflowInterface
.
Each Workflow interface must have only one method annotated with @WorkflowMethod
.
The method name can be used to denote the Workflow Type.
// Workflow interface
@WorkflowInterface
public interface YourWorkflow {
@WorkflowMethod
String yourWFMethod(Arguments args);
}
However, when using dynamic Workflows, do not specify a @WorkflowMethod
, and implement the DynamicWorkflow
directly in the Workflow implementation code.
The @WorkflowMethod
identifies the method that is the starting point of the Workflow Execution.
The Workflow Execution completes when this method completes.
You can create interface inheritance hierarchies to reuse components across other Workflow interfaces.
The interface inheritance approach does not apply to @WorkflowMethod
annotations.
A Workflow implementation implements a Workflow interface.
// Define the Workflow implementation which implements our getGreeting Workflow method.
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
}
}
To call Activities in your Workflow, call the Activity implementation.
Use ExternalWorkflowStub
to start or send Signals from within a Workflow to other running Workflow Executions.
You can also invoke other Workflows as Child Workflows with Workflow.newChildWorkflowStub()
or Workflow.newUntypedChildWorkflowStub()
within a Workflow Definition.
Use DynamicWorkflow
to implement Workflow Types dynamically.
Register a Workflow implementation type that extends DynamicWorkflow
to implement any Workflow Type that is not explicitly registered with the Worker.
The dynamic Workflow interface is implemented with the execute
method. This method takes in EncodedValues
that are inputs to the Workflow Execution.
These inputs can be specified by the Client when invoking the Workflow Execution.
public class MyDynamicWorkflow implements DynamicWorkflow {
@Override
public Object execute(EncodedValues args) {
}
}
In the Temporal PHP SDK programming model, Workflows are a class method. Classes must implement interfaces that are annotated with #[WorkflowInterface]
. The method that is the Workflow must be annotated with #[WorkflowMethod]
.
use Temporal\Workflow\YourWorkflowInterface;
use Temporal\Workflow\WorkflowMethod;
#[WorkflowInterface]
interface FileProcessingWorkflow
{
#[WorkflowMethod]
public function processFile(Argument $args);
}
In the Temporal Python SDK programming model, Workflows are defined as classes.
Specify the @workflow.defn
decorator on the Workflow class to identify a Workflow.
Use the @workflow.run
to mark the entry point method to be invoked.
This must be set on one asynchronous method defined on the same class as @workflow.defn
.
Run methods have positional parameters.
@workflow.defn
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity, name, schedule_to_close_timeout=timedelta(seconds=5)
)
In the Temporal TypeScript SDK programming model, Workflow Definitions are just functions, which can store state and orchestrate Activity Functions.
The following code snippet uses proxyActivities
to schedule a greet
Activity in the system to say hello.
A Workflow Definition can have multiple parameters; however, we recommend using a single object parameter.
type ExampleArgs = {
name: string;
};
export async function example(
args: ExampleArgs,
): Promise<{ greeting: string }> {
const greeting = await greet(args.name);
return { greeting };
}
Workflow parameters
Temporal Workflows may have any number of custom parameters. However, it is strongly recommended that objects are used as parameters, so that the object's individual fields may be altered without breaking the signature of the Workflow. All Workflow Definition parameters must be serializable.
- Go
- Java
- PHP
- Python
- TypeScript
The first parameter of a Go-based Workflow Definition must be of the workflow.Context
type, as it is used by the Temporal Go SDK to pass around Workflow Execution context, and virtually all the Go SDK APIs that are callable from the Workflow require it.
It is acquired from the go.temporal.io/sdk/workflow
package.
import (
"go.temporal.io/sdk/workflow"
)
func YourWorkflowDefinition(ctx workflow.Context, param string) error {
// ...
}
The workflow.Context
entity operates similarly to the standard context.Context
entity provided by Go.
The only difference between workflow.Context
and context.Context
is that the Done()
function, provided by workflow.Context
, returns workflow.Channel
instead of the standard Go chan
.
The second parameter, string
, is a custom parameter that is passed to the Workflow when it is invoked.
A Workflow Definition may support multiple custom parameters, or none.
These parameters can be regular type variables or safe pointers.
However, the best practice is to pass a single parameter that is of a struct
type, so there can be some backward compatibility if new parameters are added.
type YourWorkflowParam struct {
WorkflowParamFieldOne string
WorkflowParamFieldTwo int
}
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
// ...
}
All Workflow Definition parameters must be serializable, regardless of whether pointers or regular type values are used. Parameters can’t be channels, functions, variadic, or unsafe pointers.
A method annotated with @WorkflowMethod
can have any number of parameters.
We recommend passing a single parameter that contains all the input fields to allow for adding fields in a backward-compatible manner.
Note that all inputs should be serializable by the default Jackson JSON Payload Converter.
You can create a custom object and pass it to the Workflow method, as shown in the following example.
//...
@WorkflowInterface
public interface YourWorkflow {
@WorkflowMethod
String yourWFMethod(CustomObj customobj);
// ...
}
A method annotated with #[WorkflowMethod]
can have any number of parameters.
We recommend passing a single parameter that contains all the input fields to allow for adding fields in a backward-compatible manner.
Note that all inputs should be serializable to a byte array using the provided DataConverter interface. The default implementation uses a JSON serializer, but an alternative implementation can be easily configured. You can create a custom object and pass it to the Workflow method, as shown in the following example:
#[WorkflowInterface]
interface FileProcessingWorkflow {
#[WorkflowMethod]
public function processFile(Argument $args);
}
Workflow parameters are the method parameters of the singular method decorated with @workflow.run
.
These can be any data type Temporal can convert, including dataclasses
when properly type-annotated.
Technically this can be multiple parameters, but Temporal strongly encourages a single dataclass
parameter containing all input fields.
@dataclass
class YourParams:
your_int_param: int
your_str_param: str
@workflow.defn
class YourWorkflow:
@workflow.run
async def run(self, params: YourParams) -> None:
...
You can define and pass parameters in your Workflow. In this example, you define your arguments in your client.ts
file and pass those parameters to workflow.ts
through your Workflow function.
Start a Workflow with the parameters that are in the client.ts
file. In this example we set the name
parameter to Temporal
and born
to 2019
. Then set the Task Queue and Workflow Id.
client.ts
import { example } from './workflows';
...
await client.start(example, {
args: [{ name: 'Temporal', born: 2019 }],
taskQueue: 'your-queue',
workflowId: 'business-meaningful-id',
});
In workflows.ts
define the type of the parameter that the Workflow function takes in. The interface ExampleParam
is a name we can now use to describe the requirement in the previous example. It still represents having the two properties called name
and born
that is of the type string
. Then define a function that takes in a parameter of the type ExampleParam
and return a Promise<string>
. The Promise
object represents the eventual completion, or failure, of await client.start()
and its resulting value.
- TypeScript
- JavaScript
interface ExampleParam {
name: string;
born: number;
}
export async function example({ name, born }: ExampleParam): Promise<string> {
return `Hello ${name}, you were born in ${born}.`;
}
export async function example({ name, born }) {
return `Hello ${name}, you were born in ${born}.`;
}
Workflow return values
Workflow return values must also be serializable. Returning results, returning errors, or throwing exceptions is fairly idiomatic in each language that is supported. However, Temporal APIs that must be used to get the result of a Workflow Execution will only ever receive one of either the result or the error.
- Go
- Java
- PHP
- Python
- TypeScript
A Go-based Workflow Definition can return either just an error
or a customValue, error
combination.
Again, the best practice here is to use a struct
type to hold all custom values.
type YourWorkflowResponse struct{
WorkflowResultFieldOne string
WorkflowResultFieldTwo int
}
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
if err != nil {
return "", err
}
responseVar := YourWorkflowResponse {
FieldOne: "super",
FieldTwo: 1,
}
return responseVar, nil
}
A Workflow Definition written in Go can return both a custom value and an error.
However, it's not possible to receive both a custom value and an error in the calling process, as is normal in Go.
The caller will receive either one or the other.
Returning a non-nil error
from a Workflow indicates that an error was encountered during its execution and the Workflow Execution should be terminated, and any custom return values will be ignored by the system.
Workflow method arguments and return values must be serializable and deserializable using the provided DataConverter
.
The execute
method for DynamicWorkflow
can return type Object.
Ensure that your Client can handle an Object type return or is able to convert the Object type response.
Related references:
A Workflow method returns a Generator.
To properly typecast the Workflow's return value in the client code, use the #[ReturnType()]
annotation.
#[YourWorkflowInterface]
interface FileProcessingWorkflow {
#[WorkflowMethod]
#[ReturnType("string")]
public function processFile(Argument $args);
}
To return a value of the Workflow, use return
to return an object.
To return the results of a Workflow Execution, use either start_workflow()
or execute_workflow()
asynchronous methods.
@dataclass
class YourResult:
your_int_param: int
your_str_param: str
@workflow.defn
class YourWorkflow:
@workflow.run
async def run(self, params: YourResult) -> None:
return YourResult
To return a value of the Workflow function, use Promise<something>
. The Promise
is used to make asynchronous calls and comes with guarantees.
The following example uses a Promise<string>
to eventually return a name
and born
parameter.
interface ExampleParam {
name: string;
born: number;
}
export async function example({ name, born }: ExampleParam): Promise<string> {
return `Hello ${name}, you were born in ${born}.`;
}
Workflow Type
Workflows have a Type that are referred to as the Workflow name.
The following examples demonstrate how to set a custom name for your Workflow Type.
- Go
- Java
- PHP
- Python
- TypeScript
To customize the Workflow Type, set the Name
parameter with RegisterOptions
when registering your Workflow with a Worker.
- Type:
string
- Default: function name
// ...
w := worker.New(temporalClient, "your_task_queue_name", worker.Options{})
registerOptions := workflow.RegisterOptions{
Name: "YourWorkflowName",
// ...
}
w.RegisterWorkflowWithOptions(YourWorkflowDefinition, registerOptions)
// ...
The Workflow Type defaults to the short name of the Workflow interface.
In the following example, the Workflow Type defaults to NotifyUserAccounts
.
@WorkflowInterface
public interface NotifyUserAccounts {
@WorkflowMethod
void notify(String[] accountIds);
}
To overwrite this default naming and assign a custom Workflow Type, use the @WorkflowMethod
annotation with the name
parameter.
In the following example, the Workflow Type is set to Abc
.
@WorkflowInterface
public interface NotifyUserAccounts {
@WorkflowMethod(name = "Abc")
void notify(String[] accountIds);
}
When you set the Workflow Type this way, the value of the name
parameter does not have to start with an uppercase letter.
To customize a Workflow Type, use the WorkflowMethod
annotation to specify the name of Workflow.
#[WorkflowMethod(name)]
If a Workflow Type is not specified, then Workflow Type defaults to the interface name, which is YourWorkflowDefinitionInterface
in this case.
#[WorkflowInterface]
interface YourWorkflowDefinitionInterface
{
#[WorkflowMethod]
public function processFile(Argument $args);
}
You can customize the Workflow name with a custom name in the decorator argument. For example, @workflow.defn(name="your-workflow-name")
. If the name parameter is not specified, the Workflow name defaults to the function name.
@workflow.defn(name="your-workflow-name")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity, name, schedule_to_close_timeout=timedelta(seconds=5)
)
In TypeScript, the Workflow Type is the Workflow function name and there isn't a mechanism to customize the Workflow Type.
In the following example, the Workflow Type is the name of the function, helloWorld
.
- TypeScript
- JavaScript
export async function helloWorld(): Promise<string> {
return '👋 Hello World!';
}
export async function helloWorld() {
return '👋 Hello World!';
}
Workflow logic requirements
Workflow logic is constrained by deterministic execution requirements. Therefore, each language is limited to the use of certain idiomatic techniques. However, each Temporal SDK provides a set of APIs that can be used inside your Workflow to interact with external (to the Workflow) application code.
- Go
- Java
- PHP
- Python
- TypeScript
In Go, Workflow Definition code cannot directly do the following:
- Iterate over maps using
range
, because withrange
the order of the map's iteration is randomized. Instead you can collect the keys of the map, sort them, and then iterate over the sorted keys to access the map. This technique provides deterministic results. You can also use a Side Effect or an Activity to process the map instead. - Call an external API, conduct a file I/O operation, talk to another service, etc. (Use an Activity for these.)
The Temporal Go SDK has APIs to handle equivalent Go constructs:
workflow.Now()
This is a replacement fortime.Now()
.workflow.Sleep()
This is a replacement fortime.Sleep()
.workflow.GetLogger()
This ensures that the provided logger does not duplicate logs during a replay.workflow.Go()
This is a replacement for thego
statement.workflow.Channel
This is a replacement for the nativechan
type. Temporal provides support for both buffered and unbuffered channels.workflow.Selector
This is a replacement for theselect
statement. Learn more on the Go SDK Selectors pageworkflow.Context
This is a replacement forcontext.Context
. Learn more on the Go SDK Context Propagation page.
When defining Workflows using the Temporal Java SDK, the Workflow code must be written to execute effectively once and to completion.
The following constraints apply when writing Workflow Definitions:
- Do not use mutable global variables in your Workflow implementations. This will ensure that multiple Workflow instances are fully isolated.
- Your Workflow code must be deterministic.
Do not call non-deterministic functions (such as non-seeded random or
UUID.randomUUID()
) directly from the Workflow code. The Temporal SDK provides specific API for calling non-deterministic code in your Workflows. - Do not use programming language constructs that rely on system time.
For example, only use
Workflow.currentTimeMillis()
to get the current time inside a Workflow. - Do not use native Java
Thread
or any other multi-threaded classes likeThreadPoolExecutor
. UseAsync.function
orAsync.procedure
, provided by the Temporal SDK, to execute code asynchronously. - Do not use synchronization, locks, or other standard Java blocking concurrency-related classes besides those provided by the Workflow class.
There is no need for explicit synchronization because multi-threaded code inside a Workflow is executed one thread at a time and under a global lock.
- Call
Workflow.sleep
instead ofThread.sleep
. - Use
Promise
andCompletablePromise
instead ofFuture
andCompletableFuture
. - Use
WorkflowQueue
instead ofBlockingQueue
.
- Call
- Use
Workflow.getVersion
when making any changes to the Workflow code. Without this, any deployment of updated Workflow code might break already running Workflows. - Do not access configuration APIs directly from a Workflow because changes in the configuration might affect a Workflow Execution path. Pass it as an argument to a Workflow function or use an Activity to load it.
- Use
DynamicWorkflow
when you need a default Workflow that can handle all Workflow Types that are not registered with a Worker. A single implementation can implement a Workflow Type which by definition is dynamically loaded from some external source. All standardWorkflowOptions
and determinism rules apply to Dynamic Workflow implementations.
Java Workflow reference: https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/workflow/package-summary.html
**Temporal uses the Microsoft Azure Event Sourcing pattern to recover the state of a Workflow object including its local variable values.
In essence, every time a Workflow state has to be restored, its code is re-executed from the beginning. When replaying, side effects (such as Activity invocations) are ignored because they are already recorded in the Workflow event history. When writing Workflow logic, the replay is not visible, so the code should be written since it executes only once. This design puts the following constraints on the Workflow implementation:
- Do not use any mutable global variables because multiple instances of Workflows are executed in parallel.
- Do not call any non-deterministic functions like non seeded random or
UUID
directly from the Workflow code.
Always do the following in the Workflow implementation code:
- Don’t perform any IO or service calls as they are not usually deterministic. Use Activities for this.
- Only use
Workflow::now()
to get the current time inside a Workflow. - Call
yield Workflow::timer()
instead ofsleep()
. - Do not use any blocking SPL provided by PHP (i.e.
fopen
,PDO
, etc) in Workflow code. - Use
yield Workflow::getVersion()
when making any changes to the Workflow code. Without this, any deployment of updated Workflow code might break already open Workflows. - Don’t access configuration APIs directly from a Workflow because changes in the configuration might affect a Workflow execution path. Pass it as an argument to a Workflow function or use an Activity to load it.
Workflow method arguments and return values are serializable to a byte array using the provided DataConverter interface. The default implementation uses JSON serializer, but you can use any alternative serialization mechanism.
Make sure to annotate your WorkflowMethod
using ReturnType
to specify concrete return type.
You can not use the default return type declaration as Workflow methods are generators.
The values passed to Workflows through invocation parameters or returned through a result value are recorded in the execution history. The entire execution history is transferred from the Temporal service to Workflow workers with every event that the Workflow logic needs to process. A large execution history can thus adversely impact the performance of your Workflow. Therefore, be mindful of the amount of data that you transfer via Activity invocation parameters or return values. Otherwise, no additional limitations exist on Activity implementations.**
Workflow code must be deterministic. This means:
- no threading
- no randomness
- no external calls to processes
- no network I/O
- no global state mutation
- no system date or time
All API safe for Workflows used in the temporalio.workflow
must run in the implicit asyncio
event loop and be deterministic.
Content is currently unavailable.
Develop Activities
One of the primary things that Workflows do is orchestrate the execution of Activities.
Activities are normal function/method executions that can interact with the world.
For the Workflow to be able to execute the Activity, we must define the Activity Definition What is an Activity Definition? An Activity Definition is the code that defines the constraints of an Activity Task Execution.
- Go
- Java
- PHP
- Python
- TypeScript
In the Temporal Go SDK programming model, an Activity Definition is an exportable function or a struct
method.
Function
// basic function signature
func YourActivityDefinition(ctx context.Context) error {
// ...
return nil
}
// with parameters and return values
func SimpleActivity(ctx context.Context, value string) (string, error)
Struct method
type YourActivityStruct struct {
ActivityFieldOne string
ActivityFieldTwo int
}
func(a *YourActivityStruct) YourActivityDefinition(ctx context.Context) error {
// ...
}
func(a *YourActivityStruct) YourActivityDefinitionTwo(ctx context.Context) error {
// ...
}
An Activity struct can have more than one method, with each method acting as a separate Activity Type. Activities written as struct methods can use shared struct variables, such as:
- an application level DB pool
- client connection to another service
- reusable utilities
- any other expensive resources that you only want to initialize once per process
Because this is such a common need, the rest of this guide shows Activities written as struct
methods.
An Activity Definition What is an Activity? In day-to-day conversations, the term "Activity" frequently denotes either an Activity Type, an Activity Definition, or an Activity Execution.
An Activity interface is annotated with @ActivityInterface
and an Activity implementation implements this Activity interface.
To handle Activity types that do not have an explicitly registered handler, you can directly implement a dynamic Activity.
@ActivityInterface
public interface GreetingActivities {
String composeGreeting(String greeting, String language);
}
Each method defined in the Activity interface defines a separate Activity method.
You can annotate each method in the Activity interface with the @ActivityMethod
annotation, but this is completely optional.
The following example uses the @ActivityMethod
annotation for the method defined in the previous example.
@ActivityInterface
public interface GreetingActivities {
@ActivityMethod
String composeGreeting(String greeting, String language);
}
An Activity implementation is a Java class that implements an Activity annotated interface.
// Implementation for the GreetingActivities interface example from in the previous section
static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public String composeGreeting(String greeting, String name) {
return greeting + " " + name + "!";
}
}
Use DynamicActivity
to implement any number of Activity types dynamically.
When an Activity implementation that extends DynamicActivity
is registered, it is called for any Activity type invocation that doesn't have an explicitly registered handler.
The dynamic Activity interface is implemented with the execute
method, as shown in the following example.
// Dynamic Activity implementation
public static class DynamicGreetingActivityImpl implements DynamicActivity {
@Override
public Object execute(EncodedValues args) {
String activityType = Activity.getExecutionContext().getInfo().getActivityType();
return activityType
+ ": "
+ args.get(0, String.class)
+ " "
+ args.get(1, String.class)
+ " from: "
+ args.get(2, String.class);
}
}
Use Activity.getExecutionContext()
to get information about the Activity type that should be implemented dynamically.
Activities are defined as methods of a plain PHP interface annotated with #[YourActivityInterface]
.
(You can also use PHP 8 attributes in PHP 7.)
Following is an example of an interface that defines four Activities:
#[YourActivityInterface]
// Defining an interface for the activities.
interface FileProcessingActivities
{
public function upload(string $bucketName, string $localName, string $targetName): void;
#[ActivityMethod("transcode_file")]
public function download(string $bucketName, string $remoteName): void;
public function processFile(): string;
public function deleteLocalFile(string $fileName): void;
}
You can develop an Activity Definition by using the @activity.defn
decorator.
@activity.defn
async def your_activity(name: str) -> str:
return f"Hello, {name}!"
You can register the function as an Activity with a custom name through a decorator argument. For example, @activity.defn(name="your-activity")
.
@activity.defn(name="your-activity")
async def your_activity(name: str) -> str:
return f"Hello, {name}!"
Types of Activities
The following lists the different types of Activity callables:
Only positional arguments are supported by Activities.
Asynchronous Activities
Asynchronous Activities (recommended) are functions using async def
. When using asynchronous Activities there aren't any additional Worker parameters needed.
Cancellation for asynchronous activities is done by means of the
asyncio.Task.cancel
operation. This means that asyncio.CancelledError
will be raised (and can be caught, but it is not recommended).
An Activity must Heartbeat to receive cancellation.
Synchronous Activities
The activity_executor
Worker parameter must be set with a concurrent.futures.Executor
instance to use for executing the Activities.
Cancellation for synchronous Activities is done in the background and the Activity must choose to listen for it and react appropriately.
An Activity must Heartbeat to receive cancellation.
Multithreaded Activities are functions that use activity_executor
set to an instance of concurrent.futures.ThreadPoolExecutor
.
Besides activity_executor
, no other additional Worker parameters are required for synchronous multithreaded Activities.
If activity_executor
is set to an instance of concurrent.futures.Executor
that is not concurrent.futures.ThreadPoolExecutor
, then the synchronous activities are considered multiprocess/other activities.
These require special primitives for heartbeating and cancellation. The shared_state_manager
Worker parameter must be set to an instance of worker.SharedStateManager
. The most common implementation can be created by passing a multiprocessing.managers.SyncManager
(for example, as a result of multiprocessing.managers.Manager()
) to worker.SharedStateManager.create_from_multiprocessing()
.
- Activities execute in the standard Node.js environment.
- Activities cannot be in the same file as Workflows and must be separately registered.
- Activities may be retried repeatedly, so you may need to use idempotency keys for critical side effects.
Activities are just functions. The following is an Activity that accepts a string parameter and returns a string.
- TypeScript
- JavaScript
export async function greet(name: string): Promise<string> {
return `👋 Hello, ${name}!`;
}
export async function greet(name) {
return `👋 Hello, ${name}!`;
}
Activity parameters
There is no explicit limit to the total number of parameters that an Activity Definition What is an Activity Definition? An Activity Definition is the code that defines the constraints of an Activity Task Execution.
A single argument is limited to a maximum size of 2 MB. And the total size of a gRPC message, which includes all the arguments, is limited to a maximum of 4 MB.
Also, keep in mind that all Payload data is recorded in the Workflow Execution Event History What is an Event History? An append log of Events that represents the full state a Workflow Execution. What is a Workflow Task? A Workflow Task is a Task that contains the context needed to make progress with a Workflow Execution.
Some SDKs require that you pass context objects, others do not. When it comes to your application data—that is, data that is serialized and encoded into a Payload—we recommend that you use a single object as an argument that wraps the application data passed to Activities. This is so that you can change what data is passed to the Activity without breaking a function or method signature.
- Go
- Java
- PHP
- Python
- TypeScript
The first parameter of an Activity Definition is context.Context
.
This parameter is optional for an Activity Definition, though it is recommended, especially if the Activity is expected to use other Go SDK APIs.
An Activity Definition can support as many other custom parameters as needed. However, all parameters must be serializable (parameters can’t be channels, functions, variadic, or unsafe pointers), and it is recommended to pass a single struct that can be updated later.
type YourActivityParam struct {
ActivityParamFieldOne string
ActivityParamFieldTwo int
}
type YourActivityStruct struct {
// ...
}
func (a *YourActivityStruct) YourActivityDefinition(ctx context.Context, param YourActivityParam) error {
// ...
}
An Activity interface can have any number of parameters. All inputs should be serializable by the default Jackson JSON Payload Converter.
When implementing Activities, be mindful of the amount of data that you transfer using the Activity invocation parameters or return values as these are recorded in the Workflow Execution Events History. Large Events Histories can adversely impact performance.
You can create a custom object, and pass it to the Activity interface, as shown in the following example.
@ActivityInterface
public interface YourActivities {
String getCustomObject(CustomObj customobj);
void sendCustomObject(CustomObj customobj, String abc);
}
The execute
method in the dynamic Activity interface implementation takes in EncodedValues
that are inputs to the Activity Execution, as shown in the following example.
// Dynamic Activity implementation
public static class DynamicActivityImpl implements DynamicActivity {
@Override
public Object execute(EncodedValues args) {
String activityType = Activity.getExecutionContext().getInfo().getActivityType();
return activityType
+ ": "
+ args.get(0, String.class)
+ " "
+ args.get(1, String.class)
+ " from: "
+ args.get(2, String.class);
}
}
For more details, see Dynamic Activity Reference.
Each method defines a single Activity type. A single Workflow can use more than one Activity interface and call more than one Activity method from the same interface.
The only requirement is that Activity method arguments and return values are serializable to a byte array using the provided DataConverter interface. The default implementation uses a JSON serializer, but an alternative implementation can be easily configured.
Activity parameters are the function parameters of the function decorated with @activity.defn
.
These can be any data type Temporal can convert, including dataclasses
when properly type-annotated.
Technically this can be multiple parameters, but Temporal strongly encourages a single dataclass
parameter containing all input fields.
@dataclass
class YourParams:
your_int_param: int
your_str_param: str
@activity.defn
async def your_activity(params: YourParams) -> None:
...
This Activity takes a single name
parameter of type string
.
- TypeScript
- JavaScript
export async function greet(name: string): Promise<string> {
return `👋 Hello, ${name}!`;
}
export async function greet(name) {
return `👋 Hello, ${name}!`;
}
Activity return values
All data returned from an Activity must be serializable.
There is no explicit limit to the amount of data that can be returned by an Activity, but keep in mind that all return values are recorded in a Workflow Execution Event History What is an Event History? An append log of Events that represents the full state a Workflow Execution.
- Go
- Java
- PHP
- Python
- TypeScript
A Go-based Activity Definition can return either just an error
or a customValue, error
combination (same as a Workflow Definition).
You may wish to use a struct
type to hold all custom values, just keep in mind they must all be serializable.
type YourActivityResult struct{
ActivityResultFieldOne string
ActivityResultFieldTwo int
}
func (a *YourActivityStruct) YourActivityDefinition(ctx context.Context, param YourActivityParam) (YourActivityResult, error) {
// ...
result := YourActivityResult {
ActivityResultFieldOne: a.ActivityFieldOne,
ActivityResultFieldTwo: a.ActivityFieldTwo,
}
return result, nil
}
Activity return values must be serializable and deserializable by the provided DataConverter
.
The execute
method for DynamicActivity
can return type Object.
Ensure that your Workflow or Client can handle an Object type return or is able to convert the Object type response.
Return values must be serializable to a byte array using the provided DataConverter interface. The default implementation uses a JSON serializer, but an alternative implementation can be easily configured. Thus, you can return both primitive types:
class GreetingActivity implements GreetingActivityInterface
{
public function composeGreeting(string $greeting, string $name): string
{
return $greeting . ' ' . $name;
}
}
And objects:
class GreetingActivity implements GreetingActivityInterface
{
public function composeGreeting(string $greeting, string $name): Greeting
{
return new Greeting($greeting, $name);
}
}
An Activity Execution can return inputs and other Activity values.
The following example defines an Activity that takes a string as input and returns a string.
@activity.defn
async def say_hello(name: str) -> str:
return f"Hello, {name}!"
In TypeScript, the return value is always a Promise.
In the following example, Promise<string>
is the return value.
export async function greet(name: string): Promise<string> {
return `👋 Hello, ${name}!`;
}
Activity Type
Activities have a Type that are referred to as the Activity name. The following examples demonstrate how to set a custom name for your Activity Type.
- Go
- Java
- PHP
- Python
- TypeScript
To customize the Activity Type, set the Name
parameter with RegisterOptions
when registering your Activity with a Worker.
- Type:
string
- Default: function name
// ...
w := worker.New(temporalClient, "your_task_queue_name", worker.Options{})
registerOptions := activity.RegisterOptions{
Name: "YourActivityName",
// ...
}
w.RegisterActivityWithOptions(a.YourActivityDefinition, registerOptions)
// ...
The Activity Type defaults to method name, with the first letter of the method name capitalized, and can be customized using namePrefix()
or {ActivityMethod.name()}
to ensure they are distinct.
In the following example, the Activity Type defaults to ComposeGreeting
.
@ActivityInterface
public interface GreetingActivities {
@ActivityMethod
String composeGreeting(String greeting, String language);
}
To overwrite this default naming and assign a custom Activity Type, use the @ActivityMethod
annotation with the name
parameter.
In the following example, the Activity Type is set to "greet".
@ActivityInterface
public interface GreetingActivities {
@ActivityMethod(name = "greet")
String composeGreeting(String greeting, String language);
}
You can also define a prefix for all of your Activity Types using the namePrefix
parameter with the @ActivityInterface
annotation.
The following example shows a namePrefix
parameter applied to the @ActivityInterface
, and two Activity methods, of which one is defined using the @ActivityMethod
annotation.
@ActivityInterface(namePrefix = "A_")
Public interface GreetingActivities {
String sendGreeting(String input);
@ActivityMethod(name = "abc")
String composeGreeting(String greeting, String language);
}
In this example, the Activity type for the first method is set to A_SendGreeting
.
The Activity type for the method annotated with @ActivityMethod
is set to A_abc
.
An optional #[ActivityMethod]
annotation can be used to override a default Activity name.
You can define your own prefix for all Activity names by adding the prefix
option to the YourActivityInterface
annotation.
(The default prefix is empty.)
#[YourActivityInterface("file_activities.")]
interface FileProcessingActivities
{
public function upload(string $bucketName, string $localName, string $targetName);
#[ActivityMethod("transcode_file")]
public function download(string $bucketName, string $remoteName);
public function processFile(): string;
public function deleteLocalFile(string $fileName);
}
The #[YourActivityInterface("file_activities.")]
is an annotation that tells the PHP SDK to generate a class to implement the FileProcessingActivities
interface. The functions define Activities that are used in the Workflow.
You can customize the Activity name with a custom name in the decorator argument. For example, @activity.defn(name="your-activity")
. If the name parameter is not specified, the Activity name defaults to the function name.
@activity.defn(name="your-activity")
async def your_activity(name: str) -> str:
return f"Hello, {name}!"
You can customize the name of the Activity when you register it with the Worker.
In the following example, the Activity Name is activityFoo
.
snippets/src/worker-activity-type-custom.ts
- TypeScript
- JavaScript
import { Worker } from '@temporalio/worker';
import { greet } from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'snippets',
activities: {
activityFoo: greet,
},
});
await worker.run();
}
import { Worker } from '@temporalio/worker';
import { greet } from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'snippets',
activities: {
activityFoo: greet,
},
});
await worker.run();
}
Activity Execution
Calls to spawn Activity Executions What is an Activity Execution? An Activity Execution is the full chain of Activity Task Executions. What is a Workflow Definition? A Workflow Definition is the code that defines the constraints of a Workflow Execution. What is an Activity Task? An Activity Task contains the context needed to make an Activity Task Execution.
A single instance of the Activities implementation is shared across multiple simultaneous Activity invocations. Therefore, the Activity implementation code must be stateless.
The values passed to Activities through invocation parameters or returned through a result value are recorded in the Execution history. The entire Execution history is transferred from the Temporal service to Workflow Workers when a Workflow state needs to recover. A large Execution history can thus adversely impact the performance of your Workflow.
Therefore, be mindful of the amount of data you transfer through Activity invocation parameters or Return Values. Otherwise, no additional limitations exist on Activity implementations.
- Go
- Java
- PHP
- Python
- TypeScript
To spawn an Activity Execution What is an Activity Execution? An Activity Execution is the full chain of Activity Task Executions.ExecuteActivity()
API call inside your Workflow Definition.
The API is available from the go.temporal.io/sdk/workflow
package.
The ExecuteActivity()
API call requires an instance of workflow.Context
, the Activity function name, and any variables to be passed to the Activity Execution.
import (
// ...
"go.temporal.io/sdk/workflow"
)
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
yourActivityParam := YourActivityParam{
// ...
}
var activities *YourActivityStruct
future := workflow.ExecuteActivity(ctx, activities.YourActivityDefinition, yourActivityParam)
// ...
}
func (a *YourActivityStruct) YourActivityDefinition(ctx context.Context, param YourActivityParam) error {
// ...
}
The Activity function name can be provided as a variable object (no quotations) or as a string.
// ...
future := workflow.ExecuteActivity(ctx, "YourActivityDefinition", yourActivityParam)
// ...
The benefit of passing the actual function object is that the framework can validate the parameters against the Activity Definition.
The ExecuteActivity
call returns a Future, which can be used to get the result of the Activity Execution.
Activities are remote procedure calls that must be invoked from within a Workflow using ActivityStub
.
Activities are not executable on their own. You cannot start an Activity Execution by itself.
Note that before an Activity Execution is invoked:
- Activity options (either
setStartToCloseTimeout
orWhat is a Start-To-Close Timeout?
A Start-To-Close Timeout is the maximum time allowed for a single Activity Task Execution.
ScheduleToCloseTimeout
are required) must be set for the Activity. For details, see Set Activity Options and Activity Options reference.What is a Schedule-To-Close Timeout?
A Schedule-To-Close Timeout is the maximum amount of time allowed for the overall Activity Execution, from when the first Activity Task is scheduled to when the last Activity Task, in the chain of Activity Tasks that make up the Activity Execution, reaches a Closed status.
- The Activity must be registered with a Worker.
See Worker Program
How to develop a Worker Program in Java
Use the
newWorker
method on an instance of aWorkerFactory
to create a new Worker in Java. - Activity code must be thread-safe.
Activities should only be instantiated using stubs from within a Workflow.
An ActivityStub
returns a client-side stub that implements an Activity interface.
You can invoke Activities using Workflow.newActivityStub
(type-safe) or Workflow.newUntypedActivityStub
(untyped).
Calling a method on the Activity interface schedules the Activity invocation with the Temporal service, and generates an What is an Event? Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.ActivityTaskScheduled
Event
Activities can be invoked synchronously or asynchronously.
Invoking Activities Synchronously
In the following example, we use the type-safe Workflow.newActivityStub
within the "FileProcessingWorkflow" Workflow implementation to create a client-side stub of the FileProcessingActivities
class. We also define ActivityOptions
and set setStartToCloseTimeout
option to one hour.
public class FileProcessingWorkflowImpl implements FileProcessingWorkflow {
private final FileProcessingActivities activities;
public FileProcessingWorkflowImpl() {
this.activities = Workflow.newActivityStub(
FileProcessingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofHours(1))
.build());
}
@Override
public void processFile(Arguments args) {
String localName = null;
String processedName = null;
try {
localName = activities.download(args.getSourceBucketName(), args.getSourceFilename());
processedName = activities.processFile(localName);
activities.upload(args.getTargetBucketName(), args.getTargetFilename(), processedName);
} finally {
if (localName != null) {
activities.deleteLocalFile(localName);
}
if (processedName != null) {
activities.deleteLocalFile(processedName);
}
}
}
// ...
}
A Workflow can have multiple Activity stubs. Each Activity stub can have its own ActivityOptions
defined.
The following example shows a Workflow implementation with two typed Activity stubs.
public FileProcessingWorkflowImpl() {
ActivityOptions options1 = ActivityOptions.newBuilder()
.setTaskQueue("taskQueue1")
.setStartToCloseTimeout(Duration.ofMinutes(10))
.build();
this.store1 = Workflow.newActivityStub(FileProcessingActivities.class, options1);
ActivityOptions options2 = ActivityOptions.newBuilder()
.setTaskQueue("taskQueue2")
.setStartToCloseTimeout(Duration.ofMinutes(5))
.build();
this.store2 = Workflow.newActivityStub(FileProcessingActivities.class, options2);
}
To invoke Activities inside Workflows without referencing the interface it implements, use an untyped Activity stub Workflow.newUntypedActivityStub
.
This is useful when the Activity type is not known at compile time, or to invoke Activities implemented in different programming languages.
// Workflow code
ActivityOptions activityOptions =
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(3))
.setTaskQueue("simple-queue-node")
.build();
ActivityStub activity = Workflow.newUntypedActivityStub(activityOptions);
activity.execute("ComposeGreeting", String.class, "Hello World" , "Spanish");
Invoking Activities Asynchronously
Sometimes Workflows need to perform certain operations in parallel.
The Temporal Java SDK provides the Async
class which includes static methods used to invoke any Activity asynchronously.
The calls return a result of type Promise
which is similar to the Java Future
and CompletionStage
.
When invoking Activities, use Async.function
for Activities that return a result, and Async.procedure
for Activities that return void.
In the following asynchronous Activity invocation, the method reference is passed to Async.function
followed by Activity arguments.
Promise<String> localNamePromise = Async.function(activities::download, sourceBucket, sourceFile);
The following example shows how to call two Activity methods, "download" and "upload", in parallel on multiple files.
public void processFile(Arguments args) {
List<Promise<String>> localNamePromises = new ArrayList<>();
List<String> processedNames = null;
try {
// Download all files in parallel.
for (String sourceFilename : args.getSourceFilenames()) {
Promise<String> localName =
Async.function(activities::download, args.getSourceBucketName(), sourceFilename);
localNamePromises.add(localName);
}
List<String> localNames = new ArrayList<>();
for (Promise<String> localName : localNamePromises) {
localNames.add(localName.get());
}
processedNames = activities.processFiles(localNames);
// Upload all results in parallel.
List<Promise<Void>> uploadedList = new ArrayList<>();
for (String processedName : processedNames) {
Promise<Void> uploaded =
Async.procedure(
activities::upload,
args.getTargetBucketName(),
args.getTargetFilename(),
processedName);
uploadedList.add(uploaded);
}
// Wait for all uploads to complete.
Promise.allOf(uploadedList).get();
} finally {
for (Promise<String> localNamePromise : localNamePromises) {
// Skip files that haven't completed downloading.
if (localNamePromise.isCompleted()) {
activities.deleteLocalFile(localNamePromise.get());
}
}
if (processedNames != null) {
for (String processedName : processedNames) {
activities.deleteLocalFile(processedName);
}
}
}
}
Activity Execution Context
ActivityExecutionContext
is a context object passed to each Activity implementation by default.
You can access it in your Activity implementations via Activity.getExecutionContext()
.
It provides getters to access information about the Workflow that invoked the Activity.
Note that the Activity context information is stored in a thread-local variable.
Therefore, calls to getExecutionContext()
succeed only within the thread that invoked the Activity function.
Following is an example of using the ActivityExecutionContext
:
public class FileProcessingActivitiesImpl implements FileProcessingActivities {
@Override
public String download(String bucketName, String remoteName, String localName) {
ActivityExecutionContext ctx = Activity.getExecutionContext();
ActivityInfo info = ctx.getInfo();
log.info("namespace=" + info.getActivityNamespace());
log.info("workflowId=" + info.getWorkflowId());
log.info("runId=" + info.getRunId());
log.info("activityId=" + info.getActivityId());
log.info("activityTimeout=" + info.getStartToCloseTimeout();
return downloadFileFromS3(bucketName, remoteName, localDirectory + localName);
}
...
}
For details on getting the results of an Activity Execution, see Activity Execution Result How to get the result of an Activity Execution To get the results of an asynchronously invoked Activity method, use the Promise
get
method to block until the Activity method result is available.
Activity implementation is an implementation of an Activity interface. The following code example, uses a constructor that takes an Amazon S3 client and a local directory, and uploads a file to the S3 bucket. Then, the code uses a function to download a file from the S3 bucket passing a bucket name, remote name, and local name as arguments. Finally, it uses a function that takes a local file name as an argument and returns a string.
// An implementation of an Activity interface.
class FileProcessingActivitiesImpl implements FileProcessingActivities {
private S3Client $s3Client;
private string $localDirectory;
public function __construct(S3Client $s3Client, string $localDirectory) {
$this->s3Client = $s3Client;
$this->localDirectory = $localDirectory;
}
// Uploading a file to S3.
public function upload(string $bucketName, string $localName, string $targetName): void
{
$this->s3Client->putObject(
$bucketName,
$targetName,
fopen($this->localDirectory . $localName, 'rb+')
);
}
// Downloading a file from S3.
public function download(
string $bucketName,
string $remoteName,
string $localName
): void
{
$this->s3Client->downloadObject(
$bucketName,
$remoteName,
fopen($this->localDirectory .$localName, 'wb+')
);
}
// A function that takes a local file name as an argument and returns a string.
public function processFile(string $localName): string
{
// Implementation omitted for brevity.
return compressFile($this->localDirectory . $localName);
}
public function deleteLocalFile(string $fileName): void
{
unlink($this->localDirectory . $fileName);
}
}
To spawn an Activity Execution, use the execute_activity()
operation from within your Workflow Definition.
@workflow.defn
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity, name, schedule_to_close_timeout=timedelta(seconds=5)
)
execute_activity()
is a shortcut for start_activity()
that waits on its result.
To get just the handle to wait and cancel separately, use start_activity()
.
In most cases, use execute_activity()
unless advanced task capabilities are needed.
A single argument to the Activity is positional. Multiple arguments are not supported in the type-safe form of start_activity()
or execute_activity()
and must be supplied by the args
keyword argument.
To spawn an Activity Execution, you must retrieve the Activity handle in your Workflow.
import { proxyActivities } from '@temporalio/workflow';
// Only import the activity types
import type * as activities from './activities';
const { greet } = proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});
// A workflow that calls an activity
export async function example(name: string): Promise<string> {
return await greet(name);
}
This imports the individual Activities and declares the type alias for each Activity.
Required timeout
Activity Execution semantics rely on several parameters.
The only required value that needs to be set is either a Schedule-To-Close Timeout What is a Start-To-Close Timeout? A Start-To-Close Timeout is the maximum time allowed for a single Activity Task Execution. What is a Start-To-Close Timeout? A Start-To-Close Timeout is the maximum time allowed for a single Activity Task Execution.
Get Activity results
The call to spawn an Activity Execution What is an Activity Execution? An Activity Execution is the full chain of Activity Task Executions.
- Go
- Java
- PHP
- Python
- TypeScript
The ExecuteActivity
API call returns an instance of workflow.Future
which has the following two methods:
Get()
: Takes an instance of theworkflow.Context
, that was passed to the Activity Execution, and a pointer as parameters. The variable associated with the pointer is populated with the Activity Execution result. This call blocks until the results are available.IsReady()
: Returnstrue
when the result of the Activity Execution is ready.
Call the Get()
method on the instance of workflow.Future
to get the result of the Activity Execution.
The type of the result parameter must match the type of the return value declared by the Activity function.
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
future := workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam)
var yourActivityResult YourActivityResult
if err := future.Get(ctx, &yourActivityResult); err != nil {
// ...
}
// ...
}
Use the IsReady()
method first to make sure the Get()
call doesn't cause the Workflow Execution to wait on the result.
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
future := workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam)
// ...
if(future.IsReady()) {
var yourActivityResult YourActivityResult
if err := future.Get(ctx, &yourActivityResult); err != nil {
// ...
}
}
// ...
}
It is idiomatic to invoke multiple Activity Executions from within a Workflow. Therefore, it is also idiomatic to either block on the results of the Activity Executions or continue on to execute additional logic, checking for the Activity Execution results at a later time.
To get the results of an asynchronously invoked Activity method, use the Promise
get
method to block until the Activity method result is available.
Sometimes an Activity Execution lifecycle goes beyond a synchronous method invocation. For example, a request can be put in a queue and later a reply comes and is picked up by a different Worker process. The whole request-reply interaction can be modeled as a single Activity.
To indicate that an Activity should not be completed upon its method return, call ActivityExecutionContext.doNotCompleteOnReturn()
from the original Activity thread.
Then later, when replies come, complete the Activity using the ActivityCompletionClient
.
To correlate Activity invocation with completion use either a TaskToken
or Workflow and Activity IDs.
Following is an example of using ActivityExecutionContext.doNotCompleteOnReturn()
:
public class FileProcessingActivitiesImpl implements FileProcessingActivities {
public String download(String bucketName, String remoteName, String localName) {
ActivityExecutionContext ctx = Activity.getExecutionContext();
// Used to correlate reply
byte[] taskToken = ctx.getInfo().getTaskToken();
asyncDownloadFileFromS3(taskToken, bucketName, remoteName, localDirectory + localName);
ctx.doNotCompleteOnReturn();
// Return value is ignored when doNotCompleteOnReturn was called.
return "ignored";
}
...
}
When the download is complete, the download service potentially can complete the Activity, or fail it from a different process, for example:
public <R> void completeActivity(byte[] taskToken, R result) {
completionClient.complete(taskToken, result);
}
public void failActivity(byte[] taskToken, Exception failure) {
completionClient.completeExceptionally(taskToken, failure);
}
Workflow::newActivityStub
returns a client-side stub an implements an Activity interface. The client-side stub can be used within the Workflow code. It takes the Activity's type andActivityOptions
as arguments.
Calling (via yield
) a method on this interface invokes an Activity that implements this method.
An Activity invocation synchronously blocks until the Activity completes, fails, or times out.
Even if Activity execution takes a few months, the Workflow code still sees it as a single synchronous invocation.
It doesn't matter what happens to the processes that host the Workflow.
The business logic code just sees a single method call.
class GreetingWorkflow implements GreetingWorkflowInterface
{
private $greetingActivity;
public function __construct()
{
$this->greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()->withStartToCloseTimeout(\DateInterval::createFromDateString('30 seconds'))
);
}
public function greet(string $name): \Generator
{
// This is a blocking call that returns only after the activity has completed.
return yield $this->greetingActivity->composeGreeting('Hello', $name);
}
}
If different Activities need different options, like timeouts or a task queue, multiple client-side stubs can be created with different options.
$greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()->withStartToCloseTimeout(\DateInterval::createFromDateString('30 seconds'))
);
$greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()->withStartToCloseTimeout(\DateInterval::createFromDateString('30 minutes'))
);
Use start_activity()
to start an Activity and return its handle, ActivityHandle
. Use execute_activity()
to return the results.
You must provide either schedule_to_close_timeout
or start_to_close_timeout
.
execute_activity()
is a shortcut for await start_activity()
. An asynchronous execute_activity()
helper is provided which takes the same arguments as start_activity()
and await
s on the result. execute_activity()
should be used in most cases unless advanced task capabilities are needed.
@workflow.defn
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity, name, schedule_to_close_timeout=timedelta(seconds=5)
)
Since Activities are referenced by their string name, you can reference them dynamically to get the result of an Activity Execution.
export async function DynamicWorkflow(activityName, ...args) {
const acts = proxyActivities(/* activityOptions */);
// these are equivalent
await acts.activity1();
await acts['activity1']();
let result = await acts[activityName](...args);
return result;
}
The proxyActivities()
returns an object that calls the Activities in the function. acts[activityName]()
references the Activity using the Activity name, then it returns the results.
Run Worker Processes
The Worker Process What is a Worker Process? A Worker Process is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Server with the results.
- Each Worker Entityin the Worker Process must register the exact Workflow Types and Activity Types it may execute.
What is a Worker Entity?
A Worker Entity is the individual Worker within a Worker Process that listens to a specific Task Queue.
- Each Worker Entity must also associate itself with exactly one Task Queue.
What is a Task Queue?
A Task Queue is a first-in, first-out queue that a Worker Process polls for Tasks.
- Each Worker Entity polling the same Task Queue must be registered with the same Workflow Types and Activity Types.
A Worker Entity What is a Worker Entity? A Worker Entity is the individual Worker within a Worker Process that listens to a specific Task Queue.
Although multiple Worker Entities can be in a single Worker Process, a single Worker Entity Worker Process may be perfectly sufficient. For more information, see the Worker tuning guide.
A Worker Entity contains both a Workflow Worker and an Activity Worker so that it can make progress for either a Workflow Execution or an Activity Execution.
- Go
- Java
- PHP
- Python
- TypeScript
Create an instance of Worker
by calling worker.New()
, available through the go.temporal.io/sdk/worker
package, and pass it the following parameters:
- An instance of the Temporal Go SDK
Client
. - The name of the Task Queue that it will poll.
- An instance of
worker.Options
, which can be empty.
Then, register the Workflow Types and the Activity Types that the Worker will be capable of executing.
Lastly, call either the Start()
or the Run()
method on the instance of the Worker.
Run accepts an interrupt channel as a parameter, so that the Worker can be stopped in the terminal.
Otherwise, the Stop()
method must be called to stop the Worker.
package main
import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
c, err := client.Dial(client.Options{})
if err != nil {
// ...
}
defer c.Close()
w := worker.New(c, "your-task-queue", worker.Options{})
w.RegisterWorkflow(YourWorkflowDefinition)
w.RegisterActivity(YourActivityDefinition)
err = w.Run(worker.InterruptCh())
if err != nil {
// ...
}
// ...
}
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
}
func YourActivityDefinition(ctx context.Context, param YourActivityParam) (YourActivityResponse, error) {
// ...
}
If you have gow
installed, the Worker Process automatically "reloads" when you update the Worker file:
go install github.com/mitranim/gow@latest
gow run worker/main.go # automatically reload when file changed
Use the newWorker
method on an instance of a WorkerFactory
to create a new Worker in Java.
A single Worker Entity can contain many Worker Objects.
Call the start()
method on the instance of the WorkerFactory
to start all the Workers created in this process.
// ...
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
public class YourWorker {
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker yourWorker = factory.newWorker("your_task_queue");
// Register Workflow
// and/or register Activities
factory.start();
}
}
After creating the Worker entity, register all Workflow Types and all Activity Types that the Worker can execute. A Worker can be registered with just Workflows, just Activities, or both.
Operation guides:
The RoadRunner application server will launch multiple Temporal PHP Worker processes based on provided .rr.yaml
configuration.
Each Worker might connect to one or multiple Task Queues. Worker poll Temporal service for tasks, performs those tasks, and communicates task execution results back to the Temporal service.
Worker code are developed, deployed, and operated by Temporal customers.
To create a worker use Temporal\WorkerFactory
:
<?php
declare(strict_types=1);
use Temporal\WorkerFactory;
ini_set('display_errors', 'stderr');
include "vendor/autoload.php";
// factory initiates and runs task queue specific activity and workflow workers
$factory = WorkerFactory::create();
// Worker that listens on a Task Queue and hosts both workflow and activity implementations.
$worker = $factory->newWorker();
// Workflows are stateful. So you need a type to create instances.
$worker->registerWorkflowTypes(App\DemoWorkflow::class);
// Activities are stateless and thread safe. So a shared instance is used.
$worker->registerActivity(App\DemoActivity::class);
// In case an activity class requires some external dependencies provide a callback - factory
// that creates or builds a new activity instance. The factory should be a callable which accepts
// an instance of ReflectionClass with an activity class which should be created.
$worker->registerActivity(App\DemoActivity::class, fn(ReflectionClass $class) => $container->create($class->getName()));
// start primary loop
$factory->run();
You can configure task queue name using first argument of WorkerFactory
->newWorker
:
$worker = $factory->newWorker('your-task-queue');
As mentioned above you can create as many Task Queue connections inside a single Worker Process as you need.
To configure additional WorkerOptions use Temporal\Worker\WorkerOptions
:
use Temporal\Worker\WorkerOptions;
$worker = $factory->newWorker(
'your-task-queue',
WorkerOptions::new()
->withMaxConcurrentWorkflowTaskPollers(10)
);
Make sure to point the Worker file in application server configuration:
rpc:
listen: tcp://127.0.0.1:6001
server:
command: "php worker.php"
temporal:
address: "temporal:7233"
activities:
num_workers: 10
You can serve HTTP endpoints using the same server setup.
To develop a Worker, use the Worker()
constructor and add your Client, Task Queue, Workflows, and Activities as arguments.
The following code example creates a Worker that polls for tasks from the Task Queue and executes the Workflow.
worker = Worker(
client,
task_queue="your-task-queue",
workflows=[YourWorkflow],
activities=[your_activity],
)
The following code example shows a Worker hosting Workflows and Activities.
async def run_worker(stop_event: asyncio.Event):
# Create Client connected to server at the given address
client = await Client.connect("127.0.0.1:7233", namespace="your-custom-namespace")
# Run the worker until the event is set
worker = Worker(
client,
task_queue="your-task-queue",
workflows=[YourWorkflow],
activities=[your_activity],
)
async with worker:
await stop_event.wait()
The asyncio.Event
that will be set when the Worker should stop.
Although this example accepts a stop event and uses async with
, you can also use run()
and shutdown()
.
The shutdown()
operation waits on all Activities to complete, so if a long-running Activity does not at least respect cancellation, the shutdown might never complete.
Create a Worker with Worker.create()
(which establishes the initial gRPC connection), then call worker.run()
on it (to start polling the Task Queue).
Below is an example of starting a Worker that polls the Task Queue named tutorial
.
- TypeScript
- JavaScript
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
// Step 1: Register Workflows and Activities with the Worker and connect to
// the Temporal server.
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'hello-world',
});
// Worker connects to localhost by default and uses console.error for logging.
// Customize the Worker by passing more options to create():
// https://typescript.temporal.io/api/classes/worker.Worker
// If you need to configure server connection parameters, see docs:
// https://docs.temporal.io/typescript/security#encryption-in-transit-with-mtls
// Step 2: Start accepting tasks on the `hello-world` queue
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
// Step 1: Register Workflows and Activities with the Worker and connect to
// the Temporal server.
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'hello-world',
});
// Worker connects to localhost by default and uses console.error for logging.
// Customize the Worker by passing more options to create():
// https://typescript.temporal.io/api/classes/worker.Worker
// If you need to configure server connection parameters, see docs:
// https://docs.temporal.io/typescript/security#encryption-in-transit-with-mtls
// Step 2: Start accepting tasks on the `hello-world` queue
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
taskQueue
is the only required option, but you will also use workflowsPath
and activities
to register Workflows and Activities with the Worker.
A full example for Workers looks like this:
import { NativeConnection, Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
const connection = await NativeConnection.connect({
// defaults port to 7233 if not specified
address: 'foo.bar.tmprl.cloud',
tls: {
// set to true if TLS without mTLS
// See docs for other TLS options
clientCertPair: {
crt: clientCert,
key: clientKey,
},
},
});
const worker = await Worker.create({
connection,
namespace: 'foo.bar', // as explained in Namespaces section
// ...
});
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
See below for more Worker options.
Workflow and Activity registration
Workers bundle Workflow code and node_modules
using Webpack v5 and execute them inside V8 isolates.
Activities are directly required and run by Workers in the Node.js environment.
Workers are very flexible – you can host any or all of your Workflows and Activities on a Worker, and you can host multiple Workers in a single machine.
There are three main things the Worker needs:
taskQueue
: the Task Queue to poll. This is the only required argument.activities
: Optional. Imported and supplied directly to the Worker. Not the path.- Workflow bundle:
- Either specify a
workflowsPath
to yourworkflows.ts
file to pass to Webpack, e.g.,require.resolve('./workflows')
. Workflows will be bundled with their dependencies. - Or pass a prebuilt bundle to
workflowBundle
instead if you prefer to handle the bundling yourself.
Additional Worker Options
This is a selected subset of options you are likely to use. Even more advanced options, particularly for performance tuning, are available in the API reference.
Options | Description |
---|---|
dataConverter | Encodes and decodes data entering and exiting a Temporal Server. Supports undefined , UintBArray , and JSON. |
sinks | Allows injection of Workflow Sinks (Advanced feature: see Logging docs) |
interceptors | A mapping of interceptor type to a list of factories or module paths (Advanced feature: see Interceptors) |
Operation guides:
Register types
All Workers listening to the same Task Queue name must be registered to handle the exact same Workflows Types and Activity Types.
If a Worker polls a Task for a Workflow Type or Activity Type it does not know about, it fails that Task. However, the failure of the Task does not cause the associated Workflow Execution to fail.
- Go
- Java
- PHP
- Python
- TypeScript
The RegisterWorkflow()
and RegisterActivity()
calls essentially create an in-memory mapping between the Workflow Types and their implementations, inside the Worker process.
Registering Activity structs
Per Activity Definition How to develop an Activity Definition in Go In the Temporal Go SDK programming model, an Activity Definition is an exportable function or stuct
method.RegisterActivity()
for an Activity struct, that Worker has access to all exported methods.
Registering multiple Types
To register multiple Activity Types and/or Workflow Types with the Worker Entity, just make multiple Activity registration calls, but make sure each Type name is unique:
w.registerActivity(ActivityA)
w.registerActivity(ActivityB)
w.registerActivity(ActivityC)
w.registerWorkflow(WorkflowA)
w.registerWorkflow(WorkflowB)
w.registerWorkflow(WorkflowC)
Use worker.registerWorkflowImplementationTypes
to register Workflow Type and worker.registerActivitiesImplementations
to register Activity implementation with Workers.
For Workflows, the Workflow Type is registered with a Worker. A Workflow Type can be registered only once per Worker entity. If you define multiple Workflow implementations of the same type, you get an exception at the time of registration.
For Activities, Activity implementation instances are registered with a Worker because they are stateless and thread-safe. You can pass any number of dependencies in the Activity implementation constructor, such as the database connections, services, etc.
The following example shows how to register a Workflow and an Activity with a Worker.
Worker worker = workerFactory.newWorker("your_task_queue");
...
// Register Workflow
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
// Register Activity
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
When you register a single instance of an Activity, you can have multiple instances of Workflow Executions calling the same Activity. Activity code must be thread-safe because the same instance of the Activity code is run for every Workflow Execution that calls it.
For DynamicWorkflow
, only one Workflow implementation that extends DynamicWorkflow
can be registered with a Worker.
The following example shows how to register the DynamicWorkflow
and DynamicActivity
implementation with a Worker.
public static void main(String[] arg) {
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE);
/* Register the Dynamic Workflow implementation with the Worker. Workflow implementations
** must be known to the Worker at runtime to dispatch Workflow Tasks.
*/
worker.registerWorkflowImplementationTypes(DynamicGreetingWorkflowImpl.class);
// Start all the Workers that are in this process.
factory.start();
/* Create the Workflow stub. Note that the Workflow Type is not explicitly registered with the Worker. */
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId(WORKFLOW_ID).build();
WorkflowStub workflow = client.newUntypedWorkflowStub("DynamicWF", workflowOptions);
/**
* Register Dynamic Activity implementation with the Worker. Since Activities are stateless
* and thread-safe, we need to register a shared instance.
*/
worker.registerActivitiesImplementations(new DynamicGreetingActivityImpl());
/* Start Workflow Execution and immmediately send Signal. Pass in the Workflow args and Signal args. */
workflow.signalWithStart("greetingSignal", new Object[] {"John"}, new Object[] {"Hello"});
// Wait for the Workflow to finish getting the results.
String result = workflow.getResult(String.class);
System.out.println(result);
System.exit(0);
}
}
You can register multiple type-specific Workflow implementations alongside a single DynamicWorkflow
implementation.
You can register only one Activity instance that implements DynamicActivity
with a Worker.
Worker listens on a task queue and hosts both workflow and activity implementations:
// Workflows are stateful. So you need a type to create instances:
$worker->registerWorkflowTypes(App\DemoWorkflow::class);
// Activities are stateless and thread safe:
$worker->registerActivity(App\DemoActivity::class);
In case an activity class requires some external dependencies provide a callback - factory that creates or builds a new activity instance. The factory should be a callable which accepts an instance of ReflectionClass with an activity class which should be created.
$worker->registerActivity(
App\DemoActivity::class,
fn(ReflectionClass $class) => $container->create($class->getName())
);
If you want to clean up some resources after activity is done, you may register a finalizer. This callback is called after each activity invocation:
$worker->registerActivityFinalizer(fn() => $kernel->showtdown());
When a Worker
is created, it accepts a list of Workflows in the workflows
parameter, a list of Activities in the activities
parameter, or both.
worker = Worker(
client,
task_queue="your-task-queue",
workflows=[YourWorkflow1, YourWorkflow2],
activities=[your_activity_1, your_activity_2],
)
In development, use workflowsPath
:
- TypeScript
- JavaScript
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'snippets',
activities,
});
await worker.run();
}
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'snippets',
activities,
});
await worker.run();
}
In this snippet, the Worker bundles the Workflow code at runtime.
In production, you can improve your Worker's startup time by bundling in advance: as part of your production build, call bundleWorkflowCode
:
production/src/scripts/build-workflow-bundle.ts
- TypeScript
- JavaScript
import { bundleWorkflowCode } from '@temporalio/worker';
import { writeFile } from 'fs/promises';
import path from 'path';
async function bundle() {
const { code } = await bundleWorkflowCode({
workflowsPath: require.resolve('../workflows'),
});
const codePath = path.join(__dirname, '../../workflow-bundle.js');
await writeFile(codePath, code);
console.log(`Bundle written to ${codePath}`);
}
import { bundleWorkflowCode } from '@temporalio/worker';
import { writeFile } from 'fs/promises';
import path from 'path';
async function bundle() {
const { code } = await bundleWorkflowCode({
workflowsPath: require.resolve('../workflows'),
});
const codePath = path.join(__dirname, '../../workflow-bundle.js');
await writeFile(codePath, code);
console.log(`Bundle written to ${codePath}`);
}
Then the bundle can be passed to the Worker:
- TypeScript
- JavaScript
const workflowOption = () =>
process.env.NODE_ENV === 'production'
? {
workflowBundle: {
codePath: require.resolve('../workflow-bundle.js'),
},
}
: { workflowsPath: require.resolve('./workflows') };
async function run() {
const worker = await Worker.create({
...workflowOption(),
activities,
taskQueue: 'production-sample',
});
await worker.run();
}
const workflowOption = () => process.env.NODE_ENV === 'production'
? {
workflowBundle: {
codePath: require.resolve('../workflow-bundle.js'),
},
}
: { workflowsPath: require.resolve('./workflows') };
async function run() {
const worker = await Worker.create({
...workflowOption(),
activities,
taskQueue: 'production-sample',
});
await worker.run();
}
Start Workflow Execution
Workflow ExecutionWhat is a Workflow Execution?
A Temporal Workflow Execution is a durable, scalable, reliable, and reactive function execution. It is the main unit of execution of a Temporal Application.
In the examples below, all Workflow Executions are started using a Temporal Client. To spawn Workflow Executions from within another Workflow Execution, use either the Child Workflow or External Workflow APIs.
See the Customize Workflow Type section to see how to customize the name of the Workflow Type.
A request to spawn a Workflow Execution causes the Temporal Cluster to create the first Event (WorkflowExecutionStarted) in the Workflow Execution Event History. The Temporal Cluster then creates the first Workflow Task, resulting in the first WorkflowTaskScheduled Event.
- Go
- Java
- PHP
- Python
- TypeScript
To spawn a Workflow Execution What is a Workflow Execution? A Temporal Workflow Execution is a durable, scalable, reliable, and reactive function execution. It is the main unit of execution of a Temporal Application.ExecuteWorkflow()
method on the Go SDK Client
.
The ExecuteWorkflow()
API call requires an instance of context.Context
, an instance of StartWorkflowOptions
, a Workflow Type name, and all variables to be passed to the Workflow Execution.
The ExecuteWorkflow()
call returns a Future, which can be used to get the result of the Workflow Execution.
package main
import (
// ...
"go.temporal.io/sdk/client"
)
func main() {
temporalClient, err := client.Dial(client.Options{})
if err != nil {
// ...
}
defer temporalClient.Close()
// ...
workflowOptions := client.StartWorkflowOptions{
// ...
}
workflowRun, err := temporalClient.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition, param)
if err != nil {
// ...
}
// ...
}
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
}
If the invocation process has access to the function directly, then the Workflow Type name parameter can be passed as if the function name were a variable, without quotations.
workflowRun, err := temporalClient.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition, param)
If the invocation process does not have direct access to the statically defined Workflow Definition, for example, if the Workflow Definition is in an un-importable package, or it is written in a completely different language, then the Workflow Type can be provided as a string
.
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, "YourWorkflowDefinition", param)
Use WorkflowStub
to start a Workflow Execution from within a Client, and ExternalWorkflowStub
to start a different Workflow Execution from within a Workflow.
See How to send a Signal-with-Start in Java To send Signals to a Workflow Execution whose status is unknown, use SignalwithStart
SignalWithStart
with a WorkflowStub
in the Client code.
Using WorkflowStub
WorkflowStub
is a proxy generated by the WorkflowClient
.
Each time a new Workflow Execution is started, an instance of the Workflow implementation object is created.
Then, one of the methods (depending on the Workflow Type of the instance) annotated with @WorkflowMethod
can be invoked.
As soon as this method returns, the Workflow Execution is considered to be complete.
You can use a typed or untyped WorkflowStub
in the client code.
- Typed
WorkflowStub
are useful because they are type safe and allow you to invoke your Workflow methods such as@WorkflowMethod
,@QueryMethod
, and@SignalMethod
directly. - An untyped
WorkflowStub
does not use the Workflow interface, and is not type safe. It is more flexible because it has methods from theWorkflowStub
interface, such asstart
,signalWithStart
,getResults
(sync and async),query
,signal
,cancel
andterminate
. Note that the Temporal Java SDK also provides typedWorkflowStub
versions for these methods. When using untypedWorkflowStub
, we rely on the Workflow Type, Activity Type, Child Workflow Type, as well as Query and Signal names. For details, see Temporal Client.How to create a Temporal Client in Java
To initialize a Workflow Client, create an instance of a
WorkflowClient
, create a client-sideWorkflowStub
, and then call a Workflow method (annotated with the@WorkflowMethod
annotation).
A Workflow Execution can be started either synchronously or asynchronously.
Synchronous invocation starts a Workflow and then waits for its completion. If the process that started the Workflow crashes or stops waiting, the Workflow continues executing. Because Workflows are potentially long-running, and Client crashes happen, it is not very commonly found in production use. The following example is a type-safe approach for starting a Workflow Execution synchronously.
NotifyUserAccounts workflow = client.newWorkflowStub(
NotifyUserAccounts.class,
WorkflowOptions.newBuilder()
.setWorkflowId("notifyAccounts")
.setTaskQueue(taskQueue)
.build()
);
// start the Workflow and wait for a result.
workflow.notify(new String[] { "Account1", "Account2", "Account3", "Account4", "Account5",
"Account6", "Account7", "Account8", "Account9", "Account10"});
}
// notify(String[] accountIds) is a Workflow method defined in the Workflow Definition.Asynchronous start initiates a Workflow Execution and immediately returns to the caller. This is the most common way to start Workflows in production code. The
WorkflowClient
https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java) provides some static methods, such asstart
,execute
,signalWithStart
etc., that help with starting your Workflows asynchronously.The following examples show how to start Workflow Executions asynchronously, with either typed or untyped
WorkflowStub
.Typed WorkflowStub Example
// create typed Workflow stub
FileProcessingWorkflow workflow = client.newWorkflowStub(FileProcessingWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(taskQueue)
.setWorkflowId(workflowId)
.build());
// use WorkflowClient.execute to return future that contains Workflow result or failure, or
// use WorkflowClient.start to return WorkflowId and RunId of the started Workflow).
WorkflowClient.start(workflow::greetCustomer);Untyped WorkflowStub Example
WorkflowStub untyped = client.newUntypedWorkflowStub("FileProcessingWorkflow",
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build());
// blocks until Workflow Execution has been started (not until it completes)
untyped.start(argument);
You can call a Dynamic Workflow implementation using an untyped WorkflowStub
.
The following example shows how to call the Dynamic Workflow implementation in the Client code.
WorkflowClient client = WorkflowClient.newInstance(service);
/**
* Note that for this part of the client code, the dynamic Workflow implementation must
* be known to the Worker at runtime in order to dispatch Workflow tasks, and may be defined
* in the Worker definition as:*/
// worker.registerWorkflowImplementationTypes(DynamicGreetingWorkflowImpl.class);
/* Create the Workflow stub to call the dynamic Workflow.
* Note that the Workflow Type is not explicitly registered with the Worker.*/
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId(WORKFLOW_ID).build();
WorkflowStub workflow = client.newUntypedWorkflowStub("DynamicWF", workflowOptions);
DynamicWorkflow
can be used to invoke different Workflow Types.
To check what type is running when your Dynamic Workflow execute
method runs, use getWorkflowType()
in the implementation code.
String type = Workflow.getInfo().getWorkflowType();
See Workflow Execution Result How to get the result of a Workflow Execution in Java A synchronous Workflow Execution blocks your client thread until the Workflow Execution completes (or fails) and get the results (or error in case of failure). An asynchronous Workflow Execution immediately returns a value to the caller.
Using ExternalWorkflowStub
Use ExternalWorkflowStub
within a Workflow to invoke, and send Signals to, other Workflows by type.
This helps particularly for executing Workflows written in other language SDKs, as shown in the following example.
@Override
public String yourWFMethod(String name) {
ExternalWorkflowStub callOtherWorkflow = Workflow.newUntypedExternalWorkflowStub("OtherWFId");
}
See the Temporal Polyglot code for examples of executing Workflows written in other language SDKs.
Recurring start
You can start a Workflow Execution on a regular schedule by using setCronSchedule
Workflow option in the Client code.
Workflows can be started both synchronously and asynchronously. You can use typed or untyped workflows stubs available
via Temporal\Client\WorkflowClient
. To create workflow client:
use Temporal\Client\GRPC\ServiceClient;
use Temporal\Client\WorkflowClient;
$workflowClient = WorkflowClient::create(ServiceClient::create('localhost:7233'));
Synchronous start
A synchronous start initiates a Workflow and then waits for its completion. The started Workflow will not rely on the invocation process and will continue executing even if the waiting process crashes or stops.
Make sure to acquire workflow interface or class name you want to start. For example:
#[WorkflowInterface]
interface AccountTransferWorkflowInterface
{
#[WorkflowMethod(name: "MoneyTransfer")]
#[ReturnType('int')]
public function transfer(
string $fromAccountId,
string $toAccountId,
string $referenceId,
int $amountCents
);
}
To start such workflow in sync mode:
$accountTransfer = $workflowClient->newWorkflowStub(
AccountTransferWorkflowInterface::class
);
$result = $accountTransfer->transfer(
'fromID',
'toID',
'refID',
1000
);
Asynchronous start
An asynchronous start initiates a Workflow execution and immediately returns to the caller without waiting for a result. This is the most common way to start Workflows in a live environment.
To start a Workflow asynchronously pass workflow stub instance and start parameters into WorkflowClient
->start
method.
$accountTransfer = $workflowClient->newWorkflowStub(
AccountTransferWorkflowInterface::class
);
$run = $this->workflowClient->start($accountTransfer, 'fromID', 'toID', 'refID', 1000);
Once started you can receive workflow ID via WorkflowRun
object returned by start method:
$run = $workflowClient->start($accountTransfer, 'fromID', 'toID', 'refID', 1000);
var_dump($run->getExecution()->getID());
Recurring start
You can start a Workflow Execution on a regular schedule with the CronSchedule option.
To start a Workflow Execution in python, use either the start_workflow()
or execute_workflow()
asynchronous methods in the Client.
The following code example starts a Workflow and returns its handle.
async def main():
client = await Client.connect("127.0.0.1:7233", namespace="your-custom-namespace")
handle = await client.start_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
)
The following code example starts a Workflow and waits for completion.
async def main():
client = await Client.connect("127.0.0.1:7233", namespace="your-custom-namespace")
handle = await client.execute_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
)
When you have a Workflow Client, you can schedule the start of a Workflow with client.start()
, specifying workflowId
, taskQueue
, and args
and returning a Workflow handle immediately after the Server acknowledges the receipt.
const handle = await client.start(example, {
workflowId: 'your-workflow-id',
taskQueue: 'your-task-queue',
args: ['argument01', 'argument02', 'argument03'], // this is typechecked against workflowFn's args
});
const handle = client.getHandle(workflowId);
const result = await handle.result();
Calling client.start()
and client.execute()
send a command to Temporal Server to schedule a new Workflow Execution on the specified Task Queue. It does not actually start until a Worker that has a matching Workflow Type, polling that Task Queue, picks it up.
You can test this by executing a Workflow Client command without a matching Worker. Temporal Server records the command in Event History, but does not make progress with the Workflow Execution until a Worker starts polling with a matching Task Queue and Workflow Definition.
Workflow Execution run in a separate V8 isolate context in order to provide a deterministic runtime.
Set Task Queue
In most SDKs, the only Workflow Option that must be set is the name of the Task Queue What is a Task Queue? A Task Queue is a first-in, first-out queue that a Worker Process polls for Tasks.
For any code to execute, a Worker Process must be running that contains a Worker Entity that is polling the same Task Queue name.
- Go
- Java
- PHP
- Python
- TypeScript
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the TaskQueue
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
string
- Default: None, this is a required field to be set by the developer
workflowOptions := client.StartWorkflowOptions{
// ...
TaskQueue: "your-task-queue",
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Set the Workflow Task Queue with the WorkflowStub
instance in the Client code using WorkflowOptions.Builder.setTaskQueue
.
- Type:
String
- Default: none
//create Workflow stub for YourWorkflowInterface
YourWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
.setWorkflowId("YourWF")
// Set the Task Queue
.setTaskQueue(WorkerGreet.TASK_QUEUE)
.build());
In PHP, a Task Queue is represented in code by its name as a string
.
There are four places where the name of the Task Queue is supplied by the developer.
- When starting a Workflow, a Task Queue name must be provided in the
StartWorkflowOptions
.
// Create new Workflow Options and set the Task Queue
$workflowOptions = WorkflowOptions::new()
->withTaskQueue("Workflow-Task-Queue-1")
// ...
$yourWorkflow = $workflowClient->newWorkflowStub(
YourWorkflowInterface::class,
$workflowOptions
);
$result = $yourWorkflow->workflowMethod();
- A Task Queue name must be provided as a parameter when creating a Worker.
use Temporal\WorkerFactory;
// Create a Worker Factory
$factory = WorkerFactory::create();
// Set the Task Queue when creating the Worker
$worker = $factory->newWorker("Workflow-Task-Queue-1");
// Workflows are stateful. So you need a type to create instances.
$worker->registerWorkflowTypes(YourWorkflow::class);
// start primary loop
$factory->run();
A single Worker can listen to only one Task Queue. And, it is important to remember that the name of the Task Queue the Worker is listening to must match the name of the Task Queue provided in the options to any given Workflow or Activity.
All Workers listening to the same Task Queue name must be registered to handle the exact same Workflows Types and Activity Types.
If a Worker polls a Task for a Workflow Type or Activity Type it does not know about, it will fail that Task. However, the failure of the Task will not cause the associated Workflow Execution to fail.
- Optionally, the name of a Task Queue can be provided in the
ActivityOptions
when calling an Activity from a Workflow.
class YourWorkflow implements YourWorkflowInterface
{
private $yourActivity;
public function __construct()
{
// Create Activity options and set the Task Queue
$activityOptions = ActivityOptions::new()
->withTaskQueue("Activity-Task-Queue-1")
// ...
// Create a new Activity Stub and pass the options
$this->yourActivity = Workflow::newActivityStub(
YourActivityInterface::class,
$activityOptions
);
}
public function workflowMethod(): \Generator
{
// Call the Activity
return yield $this->yourActivity->activityMethod();
}
}
If a Task Queue name is not provided in the ActivityOptions
, then the Activity Tasks are placed in the same Task Queue as the Workflow Task Queue.
- Optionally, the name of a Task Queue can be provided in the
ChildWorkflowOptions
when calling a Child Workflow.
//Create new Child Workflow Options and set the Task Queue
$childWorkflowOptions = ChildWorkflowOptions::new()
->withTaskQueue("Child-Workflow-Task-Queue-1")
// ...
// Create a new Child Workflow Stub and set the Task Queue
$childWorkflow = Workflow::newChildWorkflowStub(
ChildWorkflowInterface::class,
$childWorkflowOptions
);
// Call the Child Workflow method
$promise = $childWorkflow->workflowMethod();
If a Task Queue name is not provided in the ChildWorkflowOptions
, then the Child Workflow Tasks are placed in the same Task Queue as the Parent Workflow Task Queue.
To set a Task Queue in Python, specify the task_queue
argument when executing a Workflow with either start_workflow()
or execute_workflow()
methods.
result = await client.execute_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
)
A Task Queue is a dynamic queue in Temporal polled by one or more Workers.
Workers bundle Workflow code and node modules using Webpack v5 and execute them inside V8 isolates. Activities are directly required and run by Workers in the Node.js environment.
Workers are flexible. You can host any or all of your Workflows and Activities on a Worker, and you can host multiple Workers on a single machine.
There are three main things the Worker needs:
taskQueue
: the Task Queue to poll. This is the only required argument.activities
: Optional. Imported and supplied directly to the Worker.- Workflow bundle, specify one of the following options:
- a
workflowsPath
to yourworkflows.ts
file to pass to Webpack. For example,require.resolve('./workflows')
. Workflows will be bundled with their dependencies. - Or pass a prebuilt bundle to
workflowBundle
, if you prefer to handle the bundling yourself.
- a
- TypeScript
- JavaScript
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
// Step 1: Register Workflows and Activities with the Worker and connect to
// the Temporal server.
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'hello-world',
});
// Worker connects to localhost by default and uses console.error for logging.
// Customize the Worker by passing more options to create():
// https://typescript.temporal.io/api/classes/worker.Worker
// If you need to configure server connection parameters, see docs:
// /typescript/security#encryption-in-transit-with-mtls
// Step 2: Start accepting tasks on the `tutorial` queue
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
// Step 1: Register Workflows and Activities with the Worker and connect to
// the Temporal server.
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'hello-world',
});
// Worker connects to localhost by default and uses console.error for logging.
// Customize the Worker by passing more options to create():
// https://typescript.temporal.io/api/classes/worker.Worker
// If you need to configure server connection parameters, see docs:
// /typescript/security#encryption-in-transit-with-mtls
// Step 2: Start accepting tasks on the `tutorial` queue
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
taskQueue
is the only required option; however, use workflowsPath
and activities
to register Workflows and Activities with the Worker.
When scheduling a Workflow, a taskQueue
must be specified.
- TypeScript
- JavaScript
import { Connection, WorkflowClient } from '@temporalio/client';
// This is the code that is used to start a workflow.
const connection = await Connection.create();
const client = new WorkflowClient({ connection });
const result = await client.execute(yourWorkflow, {
// required
taskQueue: 'your-task-queue',
// required
workflowId: 'your-workflow-id',
});
import { Connection, WorkflowClient } from '@temporalio/client';
// This is the code that is used to start a workflow.
const connection = await Connection.create();
const client = new WorkflowClient({ connection });
const result = await client.execute(yourWorkflow, {
// required
taskQueue: 'your-task-queue',
// required
workflowId: 'your-workflow-id',
});
When creating a Worker, you must pass the taskQueue
option to the Worker.create()
function.
- TypeScript
- JavaScript
const worker = await Worker.create({
// imported elsewhere
activities,
taskQueue: 'your-task-queue',
});
const worker = await Worker.create({
// imported elsewhere
activities,
taskQueue: 'your-task-queue',
});
Optionally, in Workflow code, when calling an Activity, you can specify the Task Queue by passing the taskQueue
option to proxyActivities()
, startChild()
, or executeChild()
. If you do not specify a taskQueue
, then the TypeScript SDK places Activity and Child Workflow Tasks in the same Task Queue as the Workflow Task Queue.
Workflow Id
Although it is not required, we recommend providing your own Workflow Id What is a Workflow Id? A Workflow Id is a customizable, application-level identifier for a Workflow Execution that is unique to an Open Workflow Execution within a Namespace.
- Go
- Java
- PHP
- Python
- TypeScript
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the ID
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
string
- Default: System generated UUID
workflowOptions := client.StartWorkflowOptions{
// ...
ID: "Your-Custom-Workflow-Id",
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Set the Workflow Id with the WorkflowStub
instance in the Client code using WorkflowOptions.Builder.setWorkflowId
.
- Type:
String
- Default: none
//create Workflow stub for YourWorkflowInterface
YourWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
// Set the Workflow Id
.setWorkflowId("YourWF")
.setTaskQueue(WorkerGreet.TASK_QUEUE)
.build());
The following code example grabs the userID
as an input and uses it to start the Workflow. The userID
is used as Workflow Id. You can use this to cancel your Workflow later.
#[WorkflowInterface]
interface SubscriptionWorkflowInterface
{
#[WorkflowMethod]
public function subscribe(string $userID);
}
The following code example, uses the input parameter userID
as the Workflow Id.
#[WorkflowInterface]
interface SubscriptionWorkflowInterface
{
#[WorkflowMethod]
public function subscribe(
string $userID
);
}
You can also set the Workflow Id as a constant, for example:
public const WORKFLOW_ID = Your-Workflow-Id
To set a Workflow Id in Python, specify the id
argument when executing a Workflow with either start_workflow()
or execute_workflow()
methods.
The id
argument should be a unique identifier for the Workflow Execution.
result = await client.execute_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
)
Connect to a Client with client.start()
and any arguments. Then specify your taskQueue
and set your workflowId
to a meaningful business identifier.
const handle = await client.start(example, {
workflowId: 'yourWorkflowId',
taskQueue: 'yourTaskQueue',
args: ['your', 'arg', 'uments'],
});
This starts a new Client with the given Workflow Id, Task Queue name, and an argument.
Get Workflow results
If the call to start a Workflow Execution is successful, you will gain access to the Workflow Execution's Run Id.
The Workflow Id, Run Id, and Namespace may be used to uniquely identify a Workflow Execution in the system and get its result.
It's possible to both block progress on the result (synchronous execution) or get the result at some other point in time (asynchronous execution).
In the Temporal Platform, it's also acceptable to use Queries as the preferred method for accessing the state and results of Workflow Executions.
- Go
- Java
- PHP
- Python
- TypeScript
The ExecuteWorkflow
call returns an instance of WorkflowRun
, which is the workflowRun
variable in the following line.
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, app.YourWorkflowDefinition, param)
if err != nil {
// ...
}
// ...
}
The instance of WorkflowRun
has the following three methods:
GetWorkflowID()
: Returns the Workflow Id of the invoked Workflow Execution.GetRunID()
: Always returns the Run Id of the initial Run (See Continue As New) in the series of Runs that make up the full Workflow Execution.Get
: Takes a pointer as a parameter and populates the associated variable with the Workflow Execution result.
To wait on the result of Workflow Execution in the same process that invoked it, call Get()
on the instance of WorkflowRun
that is returned by the ExecuteWorkflow()
call.
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition, param)
if err != nil {
// ...
}
var result YourWorkflowResponse
err = workflowRun.Get(context.Background(), &result)
if err != nil {
// ...
}
// ...
}
However, the result of a Workflow Execution can be obtained from a completely different process. All that is needed is the Workflow Id. (A Run Id is optional if more than one closed Workflow Execution has the same Workflow Id.) The result of the Workflow Execution is available for as long as the Workflow Execution Event History remains in the system.
Call the GetWorkflow()
method on an instance of the Go SDK Client and pass it the Workflow Id used to spawn the Workflow Execution.
Then call the Get()
method on the instance of WorkflowRun
that is returned, passing it a pointer to populate the result.
// ...
workflowID := "Your-Custom-Workflow-Id"
workflowRun := c.GetWorkflow(context.Background, workflowID)
var result YourWorkflowResponse
err = workflowRun.Get(context.Background(), &result)
if err != nil {
// ...
}
// ...
Get last completion result
In the case of a Temporal Cron Job What is a Temporal Cron Job? A Temporal Cron Job is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.
To do this, use the HasLastCompletionResult
and GetLastCompletionResult
APIs, available from the go.temporal.io/sdk/workflow
package, directly in your Workflow code.
type CronResult struct {
Count int
}
func YourCronWorkflowDefinition(ctx workflow.Context) (CronResult, error) {
count := 1
if workflow.HasLastCompletionResult(ctx) {
var lastResult CronResult
if err := workflow.GetLastCompletionResult(ctx, &lastResult); err == nil {
count = count + lastResult.Count
}
}
newResult := CronResult {
Count: count,
}
return newResult, nil
}
This will work even if one of the cron Workflow Runs fails. The next Workflow Run gets the result of the last successfully Completed Workflow Run.
A synchronous Workflow Execution blocks your client thread until the Workflow Execution completes (or fails) and get the results (or error in case of failure).
The following example is a type-safe approach for getting the results of a synchronous Workflow Execution.
FileProcessingWorkflow workflow = client.newWorkflowStub(
FileProcessingWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build();
// start sync and wait for results (or failure)
String result = workflow.processfile(new Argument());
An asynchronous Workflow Execution immediately returns a value to the caller.
The following examples show how to get the results of a Workflow Execution through typed and untyped WorkflowStub
.
Typed WorkflowStub Example
// create typed Workflow stub
FileProcessingWorkflow workflow = client.newWorkflowStub(FileProcessingWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(taskQueue)
.setWorkflowId(workflowId)
.build());
// use WorkflowClient.execute (if your Workflow takes in arguments) or WorkflowClient.start (for zero arguments)
WorkflowClient.start(workflow::greetCustomer);Untyped WorkflowStub Example
WorkflowStub untyped = client.newUntypedWorkflowStub("FileProcessingWorkflow",
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build());
// blocks until Workflow Execution has been started (not until it completes)
untyped.start(argument);
If you need to wait for a Workflow Execution to complete after an asynchronous start, the most straightforward way is to call the blocking Workflow instance again.
Note that if WorkflowOptions.WorkflowIdReusePolicy
is not set to AllowDuplicate
, then instead of throwing DuplicateWorkflowException
, it reconnects to an existing Workflow and waits for its completion.
The following example shows how to do this from a different process than the one that started the Workflow Execution.
YourWorkflow workflow = client.newWorkflowStub(YourWorkflow.class, workflowId);
// Returns the result after waiting for the Workflow to complete.
String result = workflow.yourMethod();
Another way to connect to an existing Workflow and wait for its completion from another process, is to use UntypedWorkflowStub
. For example:
WorkflowStub workflowStub = client.newUntypedWorkflowStub(workflowType, workflowOptions);
// Returns the result after waiting for the Workflow to complete.
String result = untyped.getResult(String.class);
Get last (successful) completion result
For a Temporal Cron Job, get the result of previous successful runs using GetLastCompletionResult()
.
The method returns null
if there is no previous completion.
The following example shows how to implement this in a Workflow.
public String cronWorkflow() {
String lastProcessedFileName = Workflow.getLastCompletionResult(String.class);
// Process work starting from the lastProcessedFileName.
// Business logic implementation goes here.
// Updates lastProcessedFileName to the new value.
return lastProcessedFileName;
}
Note that this works even if one of the Cron schedule runs failed. The next schedule will still get the last successful result if it ever successfully completed at least once. For example, for a daily cron Workflow, if the run succeeds on the first day and fails on the second day, then the third day run will get the result from first day's run using these APIs.
If you need to wait for the completion of a Workflow after an asynchronous start, make a blocking call to
the WorkflowRun
->getResult
method.
$run = $workflowClient->start($accountTransfer, 'fromID', 'toID', 'refID', 1000);
var_dump($run->getResult());
Use start_workflow()
or get_workflow_handle()
to return a Workflow handle.
Then use the result
method to await on the result of the Workflow.
handle = await client.start_workflow(
YourWorkflow.run, "some arg", id="your-workflow-id", task_queue="your-task-queue"
)
# Wait for result
result = await handle.result()
print(f"Result: {result}")
To get a handle for an existing Workflow by its Id, you can use get_workflow_handle()
, or use get_workflow_handle_for()
for type safety.
Then use describe()
to get the current status of the Workflow.
If the Workflow does not exist, this call fails.
To return the results of a Workflow Execution:
return (
'Completed '
+ wf.workflowInfo().workflowId
+ ', Total Charged: '
+ totalCharged
);
totalCharged
is just a function declared in your code. For a full example, see subscription-workflow-project-template-typescript/src/workflows.ts.
A Workflow function may return a result. If it doesn’t (in which case the return type is Promise<void>
), the result will be undefined
.
If you started a Workflow with handle.start()
, you can choose to wait for the result anytime with handle.result()
.
const handle = client.getHandle(workflowId);
const result = await handle.result();
Using a Workflow Handle isn't necessary with client.execute()
.
Workflows that prematurely end will throw a WorkflowFailedError
if you call result()
.
If you call result()
on a Workflow that prematurely ended for some reason, it throws a WorkflowFailedError
error that reflects the reason. For that reason, it is recommended to catch that error.
const handle = client.getHandle(workflowId);
try {
const result = await handle.result();
} catch (err) {
if (err instanceof WorkflowFailedError) {
throw new Error('Temporal workflow failed: ' + workflowId, {
cause: err,
});
} else {
throw new Error('error from Temporal workflow ' + workflowId, {
cause: err,
});
}
}