Skip to main content

Iterating Over Elements

The forEach operation enables workflows to process collections of elements by executing a sub-workflow for each element in parallel1. This is useful for any scenario where you need to apply the same workflow logic to multiple items.

The ForEach operation takes a collection of elements and embeds a sub-workflow execution for each element. The ForEach step completes when all sub-workflows have finished.

Usage Example

This element is one of the most complicated ones and requires you to configure quite a lot of various aspects:

type Element
type Input
def elementWorkflow: SubWorkflowContext.WIO[Element, Nothing, SubWorkflowState] = SubWorkflowContext.WIO.pure(SubWorkflowState()).autoNamed
def getElements(input: Input): Set[Element] = ???
def initialElementState: SubWorkflowState = ???
def initialInterimState(input: Input): MyState = ???
def updateInterimState(elem: Element, elemState: SubWorkflowState, state: MyState): MyState = ???
def buildOutput(input: Input, results: Map[Element, SubWorkflowState]): MyState = ???

def eventEmbedding: WorkflowEmbedding.Event[(Element, SubWorkflowEvent), MyEventBase] =
new WorkflowEmbedding.Event[(Element, SubWorkflowEvent), MyEventBase] {
override def convertEvent(e: (Element, SubWorkflowEvent)): MyEventBase = ???
override def unconvertEvent(e: MyEventBase): Option[(Element, SubWorkflowEvent)] = ???
}

def signalRouter: SignalRouter.Receiver[Element, MyState] = SimpleSignalRouter[Element]()

val forEachStep = WIO
.forEach[Input](getElements)
.execute[SubWorkflowContext.Ctx](elementWorkflow, initialElementState)
.withEventsEmbeddedThrough(eventEmbedding)
.withInterimState(initialInterimState)
.incorporatingChangesThrough(updateInterimState)
.withOutputBuiltWith(buildOutput)
.withSignalsWrappedWith(signalRouter)
.autoNamed()
Rendering Outputs

Draft Mode

For quick prototyping, you can use the draft API:

val subWorkflow  = WIO.draft.step()
val forEachDraft = WIO.draft.forEach(subWorkflow)
Rendering Outputs

Signals

ForEach operations support signal routing so that they can be sent to a specific element's sub-workflow. This is exposed through SignalRouter abstraction which is split into two parts:

  • SignalRouter.Receiver required when building the WIO.ForEach
  • SignalRouter.Sender required to send the signal

Typically, both are exposed by the same object.

Signal Router Types

SimpleSignalRouter

User SimpleSignalRouter when you have direct access to an element on the sender side.

val signalRouter = SimpleSignalRouter[Element]

def element: Element = ???
workflowInstance.deliverRoutedSignal(signalRouter, element, signalDef, request)

BasicSignalRouter

For more complex routing scenarios, you can extend BasicSignalRouter to extract elements from sub-workflow state using custom keys:

type Key
object SigRouter extends BasicSignalRouter[Key, Element, SubWorkflowState] {
override def extractElem(state: SubWorkflowState, key: Key): Option[Element] = ???
}

def key: Key = ???
workflowInstance.deliverRoutedSignal(SigRouter, key, signalDef, request)

Considerations

  • Concurrency - All element sub-workflows run concurrently by default. Consider the resource implications when processing large collections.
  • Error Handling - If any element sub-workflow fails, the entire ForEach operation fails. Use appropriate error handling within your element workflows for resilience.
  • State Accumulation - The interim state mechanism allows you to collect and aggregate results as sub-workflows progress, rather than waiting for all to finish before processing results.

Footnotes

  1. Here, "parallel" means that timers and signals are processed concurrently in the workflow engine. However, this is not thread-level parallelism—no true multi-threading or asynchronous execution is involved.