Workflow message passing - Ruby SDK
A Workflow can act like a stateful service that receives messages: Queries, Signals, and Updates. These messages interact with the Workflow via handler methods defined in the Workflow code. Clients use messages to read Workflow state or change its behavior.
See Workflow message passing for a general overview.
Write message handlers
The code that follows is part of a working solution.
Follow these guidelines when writing your message handlers:
- Message handlers are defined as methods on the Workflow class, decorated by calling one of three class methods before defining the handler method:
workflow_query
,workflow_signal
, andworkflow_update
. - These also implicitly create class-methods with the same name as the instance methods for use by callers.
- The parameters and return values of handlers and the main Workflow function must be serializable.
- Prefer single hash/object input parameter to multiple input parameters. Hash/object parameters allow you to add fields without changing the calling signature.
Query handlers
A Query is a synchronous operation that retrieves state from a Workflow Execution. Define as a method:
class GreetingWorkflow < Temporalio::Workflow::Definition
# ...
workflow_query
def languages(input)
# A query handler returns a value: it can inspect but must not mutate the Workflow state.
if input['include_unsupported']
CallGreetingService.greetings.keys.sort
else
@greetings.keys.sort
end
end
# ...
end
Or as an attribute reader:
class GreetingWorkflow < Temporalio::Workflow::Definition
# This is the equivalent of:
# workflow_query
# def language
# @language
# end
workflow_query_attr_reader :language
# ...
end
- The
workflow_query
class method can accept arguments. See the API reference docs:workflow_query
. - A Query handler must not modify Workflow state.
- You can't perform async blocking operations such as executing an Activity in a Query handler.
Signal handlers
A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:
class GreetingWorkflow < Temporalio::Workflow::Definition
# ...
workflow_signal
def approve(input)
# A signal handler mutates the workflow state but cannot return a value.
@approved_for_release = true
@approver_name = input['name']
end
# ...
end
-
The
workflow_signal
class method can accept arguments. Refer to the API docs:workflow_signal
. -
The handler should not return a value. The response is sent immediately from the server, without waiting for the Workflow to process the Signal.
-
Signal (and Update) handlers can be asynchronous and blocking. This allows you to use Activities, Child Workflows, durable Timers, wait conditions, and more. See Async handlers and Workflow message passing for guidelines on safely using async Signal and Update handlers.
Update handlers and validators
An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:
class GreetingWorkflow < Temporalio::Workflow::Definition
# ...
workflow_update
def set_language(new_language) # rubocop:disable Naming/AccessorMethodName
# An update handler can mutate the workflow state and return a value.
prev = @language.to_sym
@language = new_language.to_sym
prev
end
workflow_update_validator(:set_language)
def validate_set_language(new_language)
# In an update validator you raise any exception to reject the update.
raise "#{new_language} is not supported" unless @greetings.include?(new_language.to_sym)
end
# ...
end
-
The
workflow_update
class method can take arguments as described in the API reference docs forworkflow_update
. -
About validators:
- Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you can skip them.
- Define an Update validator with the
workflow_update_validator
class method invoked before defining the method. The first parameter when declaring the validator is the name of the Update handler method. The validator must accept the same argument types as the handler and should not return a value.
-
Accepting and rejecting Updates with validators:
- To reject an Update, raise an exception of any type in the validator.
- Without a validator, Updates are always accepted.
-
Validators and Event History:
- The
WorkflowExecutionUpdateAccepted
event is written into the History whether the acceptance was automatic or programmatic. - When a Validator raises an error, the Update is rejected, the Update is not run, and
WorkflowExecutionUpdateAccepted
won't be added to the Event History. The caller receives an "Update failed" error.
- The
-
Use
current_update_info
to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once. -
Update (and Signal) handlers can be asynchronous and blocking. This allows you to use Activities, Child Workflows, durable Timers, wait conditions, and more. See Async handlers and Workflow message passing for guidelines on safely using async Update and Signal handlers.
Send messages
To send Queries, Signals, or Updates you call methods on a WorkflowHandle
instance.
To obtain the Workflow handle, you can:
- Use
Client#start_workflow
to start a Workflow and return its handle. - Use the
Client#workflow_handle
method to retrieve a Workflow handle by its Workflow Id.
For example:
client = Temporalio::Client.connect('localhost:7233', 'default')
handle = client.start_workflow(
MessagePassingSimple::GreetingWorkflow,
id: 'message-passing-simple-sample-workflow-id',
task_queue: 'message-passing-simple-sample'
)
To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.
- Temporal does not support Continue-as-New functionality within Update handlers.
- Complete all handlers before using Continue-as-New.
- Use Continue-as-New from your main Workflow Definition method, just as you would complete or fail a Workflow Execution.
Send a Query
Call a Query method with WorkflowHandle#query
:
supported_languages = handle.query(MessagePassingSimple::GreetingWorkflow.languages, { include_unsupported: false })
-
Sending a Query doesn’t add events to a Workflow's Event History.
-
You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not safe and, therefore, not supported.
-
A Worker must be online and polling the Task Queue to process a Query.
Send a Signal
You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that haven’t closed.
From a Client
Use WorkflowHandle#signal
from Client code to send a Signal:
handle.signal(MessagePassingSimple::GreetingWorkflow.approve, { name: 'John Q. Approver' })
-
The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.
-
The WorkflowExecutionSignaled Event appears in the Workflow's Event History.
From a Workflow
A Workflow can send a Signal to another Workflow, known as an External Signal.
In this case you need to obtain a Workflow handle for the external Workflow.
Use Temporalio::Workflow.external_workflow_handle
, passing a running Workflow Id, to retrieve a Workflow handle:
class WorkflowB < Temporalio::Workflow::Definition
def execute
handle = Temporalio::Workflow.external_workflow_handle('workflow-a-id')
handle.signal(WorkflowA.some_signal, 'some signal arg')
end
end
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
Signal-With-Start
Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running. If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.
To use Signal-With-Start, call signal_with_start_workflow
with a WithStartWorkflowOperation
:
client = Temporalio::Client.connect('localhost:7233', 'default')
# Create start-workflow operation for use with signal-with-start
start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new(
MyWorkflow, 'my-workflow-input',
id: 'my-workflow-id', task_queue: 'my-workflow-task-queue'
)
# Perform signal-with-start
handle = client.signal_with_start_workflow(
MyWorkflow.my_signal, 'signal-input', start_workflow_operation:
)
Send an Update
An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.
A Client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions.
WorkflowExecutionUpdateAccepted
is added to the Event History when the Worker confirms that the Update passed validation.WorkflowExecutionUpdateCompleted
is added to the Event History when the Worker confirms that the Update has finished.
To send an Update to a Workflow Execution, you can:
-
Call the Update method with
execute_update
from the Workflow handle and wait for the Update to complete. This code fetches an Update result:prev_language = handle.execute_update(MessagePassingSimple::GreetingWorkflow.set_language, :chinese)
-
- Use
start_update
to receive a handle as soon as the Update is accepted. It returns aWorkflowUpdateHandle
- Use this
WorkflowUpdateHandle
later to fetch your results. - Asynchronous Update handlers normally perform long-running async Activities.
start_update
only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.
For example:
# Start an update and then wait for it to complete
update_handle = handle.start_update(
MessagePassingSimple::GreetingWorkflow.apply_language_with_lookup,
:arabic,
wait_for_stage: Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED
)
prev_language = update_handle.resultFor more details, see the "Async handlers" section.
- Use
Update-With-Start
In Public Preview in Temporal Cloud.
Minimum Temporal Server version Temporal Server version 1.26
Update-with-Start lets you send an Update that checks whether an already-running Workflow with that ID exists:
- If the Workflow exists, the Update is processed.
- If the Workflow does not exist, a new Workflow Execution is started with the given ID, and the Update is processed before the main Workflow method starts to execute.
Use execute_update_with_start_workflow
to start the Update and wait for the result in one go.
Alternatively, use start_update_with_start_workflow
to start the Update and receive a WorkflowUpdateHandle
, and then use update_handle.result
to retrieve the result from the Update.
These calls return once the requested Update wait stage has been reached, or when the request times out.
- You will need to provide a
WithStartWorkflowOperation
to define the Workflow that will be started if necessary, and its arguments. - You must specify an id_conflict_policy when creating the
WithStartWorkflowOperation
. Note that aWithStartWorkflowOperation
can only be used once.
Here's an example:
client = Temporalio::Client.connect('localhost:7233', 'default')
# Create start-workflow operation for use with update-with-start
start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new(
MyWorkflow, 'my-workflow-input',
id: 'my-workflow-id', task_queue: 'my-workflow-task-queue',
id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::USE_EXISTING
)
# Perform update-with-start and get update result
update_result = client.execute_with_start_workflow(
MyWorkflow.my_update, 'update-input', start_workflow_operation:
)
# The workflow handle is on the start operation, here's an example of waiting on
# workflow result
workflow_result = start_workflow_operation.workflow_handle.result
Message handler patterns
This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.
For additional information, see Inject work into the main Workflow and Ensuring your messages are processed exactly once.
Add async handlers
Signal and Update handlers can be asynchronous as well as blocking. Using asynchronous calls allows you to wait for Activities, Child Workflows, Durable Timers, wait conditions, etc. This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await calls.
It's essential to understand the things that could go wrong in order to use asynchronous handlers safely. See Workflow message passing for guidance on safe usage of async Signal and Update handlers, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.
The following code is an Activity that simulates a network call to a remote service:
class CallGreetingService < Temporalio::Activity::Definition
def execute(to_language)
# Simulate a network call
sleep(0.2)
# This intentionally returns nil on not found
CallGreetingService.greetings[to_language.to_sym]
end
def self.greetings
@greetings ||= {
arabic: 'مرحبا بالعالم',
chinese: '你好, 世界',
english: 'Hello, world',
french: 'Bonjour, monde',
hindi: 'नमस्ते दुनिया',
portuguese: 'Olá mundo',
spanish: 'Hola mundo'
}
end
end
The following code is a Workflow Update for asynchronous use of the preceding Activity:
class GreetingWorkflow < Temporalio::Workflow::Definition
# ...
workflow_update
def apply_language_with_lookup(new_language)
# Call an activity if it's not there.
unless @greetings.include?(new_language.to_sym)
# We use a mutex so that, if this handler is executed multiple times, each execution
# can schedule the activity only when the previously scheduled activity has
# completed. This ensures that multiple calls to apply_language_with_lookup are
# processed in order.
@apply_language_mutex ||= Mutex.new
@apply_language_mutex.synchronize do
greeting = Temporalio::Workflow.execute_activity(
CallGreetingService, new_language, start_to_close_timeout: 10
)
# The requested language might not be supported by the remote service. If so, we
# raise ApplicationError, which will fail the update. The
# WorkflowExecutionUpdateAccepted event will still be added to history. (Update
# validators can be used to reject updates before any event is written to history,
# but they cannot be async, and so we cannot use an update validator for this
# purpose.)
raise Temporalio::Error::ApplicationError, "Greeting service does not support #{new_language}" unless greeting
@greetings[new_language.to_sym] = greeting
end
end
set_language(new_language)
end
end
After updating the code for asynchronous calls, your Update handler can schedule an Activity and await the result. Although an async Signal handler can initiate similar network tasks, using an Update handler allows the Client to receive a result or error once the Activity completes. This lets your Client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc.
Use wait conditions
Sometimes, async Signal or Update handlers need to meet certain conditions before they should continue.
Using a wait condition with wait_condition
sets a function that prevents the code from proceeding until the condition is truthy.
This is an important feature that helps you control your handler logic.
Here are two important use cases for wait_condition
:
- Waiting in a handler until it is appropriate to continue.
- Waiting in the main Workflow until all active handlers have finished.
The condition state you're waiting for can be updated by and reflect any part of the Workflow code. This includes the main Workflow method, other handlers, or child coroutines spawned by the main Workflow method, and so forth.
In handlers
Sometimes, async Signal or Update handlers need to meet certain conditions before they should continue.
Using a wait condition with wait_condition
sets a function that prevents the code from proceeding until the condition is truthy.
This is an important feature that helps you control your handler logic.
Consider a ready_for_update_to_execute
method that runs before your Update handler executes.
The wait_condition
call waits until your condition is met:
workflow_update
def my_update(my_update_input)
Temporalio::Workflow.wait_condition { ready_for_update_to_execute(my_update_input) }
# ...
end
Remember: Handlers can execute before the main Workflow method starts.
Before finishing the Workflow
Workflow wait conditions can ensure your handler completes before a Workflow finishes.
When your Workflow uses async Signal or Update handlers, your main Workflow method can return or continue-as-new while a handler is still waiting on an async task, such as an Activity result.
The Workflow completing may interrupt the handler before it finishes crucial work and cause Client errors when trying retrieve Update results.
Use Temporalio::Workflow.all_handlers_finished?
to address this problem and allow your Workflow to end smoothly:
class MyWorkflow < Temporalio::Workflow::Definition
def execute
# ...
Temporalio::Workflow.wait_condition { Temporalio::Workflow.all_handlers_finished? }
'workflow-result'
end
end
By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions.
You can silence these warnings on a per-handler basis by passing the unfinished_policy
argument to the workflow_signal
/ workflow_update
class methods:
workflow_update unfinished_policy: Temporalio::Workflow::HandlerUnfinishedPolicy::ABANDON
def my_update
# ...
See Finishing handlers before the Workflow completes for more information.
Use workflow_init to access input early
The workflow_init
class method above initialize
gives it access to Workflow input.
When you use the workflow_init
on your constructor, you give the constructor the same Workflow parameters as your execute
method.
The SDK will then ensure that your constructor receives the Workflow input arguments that the Client sent.
The Workflow input arguments are also passed to your execute
method -- that always happens, whether or not you use the workflow_init
class method above initialize
.
Here's an example.
The constructor and execute
must have the same parameters with the same types:
class WorkflowInitWorkflow < Temporalio::Workflow::Definition
workflow_init
def initialize(input)
@name_with_title = "Sir #{input['name']}"
end
def execute(input)
Temporalio::Workflow.wait_condition { @title_has_been_checked }
"Hello, #{@name_with_title}"
end
workflow_update
def check_title_validity
# The handler is now guaranteed to see some workflow input since it was
# processed by the constructor
valid = Temporalio::Workflow.execute_activity(
CheckTitleValidityActivity,
@name_with_title,
start_to_close_timeout: 100
)
@title_has_been_checked = true
valid
end
end
Use locks to prevent concurrent handler execution
Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:
class MyWorkflow < Temporalio::Workflow::Definition
# ...
workflow_signal
def bad_handler
data = Temporalio::Workflow.execute_activity(
FetchDataActivity,
start_to_close_timeout: 100
)
@x = data['x']
# 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
# there may be times when the Workflow has @x from one Activity execution and @y
# from another.
Temporalio::Workflow.sleep(1)
@y = data['y']
end
end
Coordinating access with Mutex
, a mutual exclusion lock, corrects this code.
Locking makes sure that only one handler instance can execute a specific section of code at any given time:
class MyWorkflow < Temporalio::Workflow::Definition
# ...
workflow_signal
def safe_handler
@mutex ||= Mutex.new
@mutex.synchronize do
data = Temporalio::Workflow.execute_activity(
FetchDataActivity,
start_to_close_timeout: 100
)
@x = data['x']
# 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
# there may be times when the Workflow has @x from one Activity execution and @y
# from another.
Temporalio::Workflow.sleep(1)
@y = data['y']
end
end
end
For additional concurrency options, wait_condition
can be used to do more advanced things such as using an integer
attribute + wait_condition
as a semaphore.
Troubleshooting
When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:
-
The Client can't contact the server: You'll receive a
Temporalio::Error::RPCError
exception whosecode
is anUNAVAILABLE
constant defined inCode
(after some retries). -
The Workflow does not exist: You'll receive a
Temporalio::Error::RPCError
exception whosecode
is aNOT_FOUND
constant defined inCode
.
See Exceptions in message handlers for a non–Ruby-specific discussion of this topic.
Signal issues
When using Signal, the only exception that will result from your requests during its execution is RPCError
.
All handlers may experience additional exceptions during the initial (pre-Worker) part of a handler request lifecycle.
For Queries and Updates, the Client waits for a response from the Worker. If an issue occurs during the handler Execution by the Worker, the Client may receive an exception.
Update issues
When working with Updates, you may encounter these errors:
-
No Workflow Workers are polling the Task Queue: Your request will be retried by the SDK Client indefinitely. Use a
Cancellation
in your RPC options to cancel the Update. This raises a WorkflowUpdateRPCTimeoutOrCanceledError exception . -
Update failed: You'll receive a
WorkflowUpdateFailedError
exception. There are two ways this can happen:-
The Update was rejected by an Update validator defined in the Workflow alongside the Update handler.
-
The Update failed after having been accepted.
Update failures are like Workflow failures. Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. These might include:
- A failed Child Workflow
- A failed Activity (if the Activity retries have been set to a finite number)
- The Workflow author raising
ApplicationError
- Any error listed in
workflow_failure_exception_types
on the Worker orworkflow_failure_exception_type
on the Workflow (empty by default)
-
-
The handler caused the Workflow Task to fail: A Workflow Task Failure causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:
- If the request hasn't been accepted by the server, you receive a
FAILED_PRECONDITION
Temporalio::Error::RPCError
exception. - If the request has been accepted, it is durable.
Once the Workflow is healthy again after a code deploy, use an
WorkflowUpdateHandle
to fetch the Update result.
- If the request hasn't been accepted by the server, you receive a
-
The Workflow finished while the Update handler execution was in progress: You'll receive a
Temporalio::Error::RPCError
"workflow execution already completed".This will happen if the Workflow finished while the Update handler execution was in progress, for example because
-
The Workflow was canceled or failed.
-
The Workflow completed normally or continued-as-new and the Workflow author did not wait for handlers to be finished.
-
Query issues
When working with Queries, you may encounter these errors:
-
There is no Workflow Worker polling the Task Queue: You'll receive a
Temporalio::Error::RPCError
exception whosecode
is aFAILED_PRECONDITION
constant defined inCode
. -
Query failed: You'll receive a
WorkflowQueryFailedError
exception if something goes wrong during a Query. Any exception in a Query handler will trigger this error. This differs from Signal and Update requests, where exceptions can lead to Workflow Task Failure instead. -
The handler caused the Workflow Task to fail. This would happen, for example, if the Query handler blocks the thread for too long without yielding.
Dynamic handlers
Temporal supports Dynamic Queries, Signals, Updates, Workflows, and Activities. These are unnamed handlers that are invoked if no other statically defined handler with the given name exists.
Dynamic Handlers provide flexibility to handle cases where the names of Queries, Signals, Updates, Workflows, or Activities, aren't known at run time.
Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. Overusing them can lead to maintainability and debugging issues down the line.
Instead, Signals, Queries, Workflows, or Activities should be defined statically whenever possible, with clear names that indicate their purpose. Use static definitions as the primary way of structuring your Workflows.
Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic.
Dynamic Query
A Dynamic Query in Temporal is a Query method that is invoked dynamically at runtime if no other Query with the same name is registered.
A Query can be made dynamic by setting dynamic
to true
on the workflow_query
class method.
Only one Dynamic Query can be present on a Workflow.
The Query Handler parameters must accept a string name as the first parameter. Often users set raw_args
to true
and set the second parameter as *args
which will be an array of Temporalio::Converters::RawValue
.
The Temporalio::Workflow.payload_converter property is used to convert the raw value instances to proper types.
workflow_query dynamic: true, raw_args: true
def dynamic_query(query_name, *args)
first_param = Temporalio::Workflow.payload_converter.from_payload(
args.first || raise 'Missing first parameter'
)
"Got parameter #{first_param} for query #{query_name}"
end
Dynamic Signal
A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered.
A Signal can be made dynamic by setting dynamic
to true
on the workflow_signal
class method.
Only one Dynamic Signal can be present on a Workflow.
The Signal Handler parameters must accept a string name as the first parameter. Often users set raw_args
to true
and set the second parameter as *args
which will be an array of Temporalio::Converters::RawValue
.
The Temporalio::Workflow.payload_converter property is used to convert the raw value instances to proper types.
workflow_signal dynamic: true, raw_args: true
def dynamic_signal(signal_name, *args)
first_param = Temporalio::Workflow.payload_converter.from_payload(
args.first || raise 'Missing first parameter'
)
@pending_things << "Got parameter #{first_param} for signal #{signal_name}"
end
Dynamic Update
A Dynamic Update in Temporal is an Update that is invoked dynamically at runtime if no other Update with the same input is registered.
An Update can be made dynamic by setting dynamic
to true
on the workflow_update
class method.
Only one Dynamic Update can be present on a Workflow.
The Query Handler parameters must accept a string name as the first parameter. Often users set raw_args
to true
and set the second parameter as *args
which will be an array of Temporalio::Converters::RawValue
.
The Temporalio::Workflow.payload_converter property is used to convert the raw value instances to proper types.
workflow_update dynamic: true, raw_args: true
def dynamic_update(update_name, *args)
first_param = Temporalio::Workflow.payload_converter.from_payload(
args.first || raise 'Missing first parameter'
)
@pending_things << "Got parameter #{first_param} for update #{update_name}"
end