Wake-Ups
The KnockerUpper
is responsible for waking up the workflow when needed, e.g.:
- When a workflow is awaiting a specific time to pass.
- When a workflow has timer-based interruptions.
This component ensures workflows resume execution at the right moment.
Available Implementations
Below is an overview of the available KnockerUpper
implementations, with their strengths and limitations:
Implementation | Description | Good For | Bad For |
---|---|---|---|
NoOpKnockerUpper | Does nothing; ignores all registered wake-ups. |
|
|
SleepingKnockerUpper | In-memory implementation based on IO.sleep , non-persistent. |
|
|
FilesystemKnockerUpper | Relies on the filesystem and stores a file per workflow for scheduling wake-ups. |
|
|
QuartzKnockerUpper | Integrates with Quartz for sophisticated scheduling capabilities. |
|
|
Design Overview
The overall design of KnockerUpper
splits responsibilities across two interfaces:
KnockerUpper.Agent
: Responsible for managing wake-up schedules. This includes adding, updating, and removing wake-up times for workflows.KnockerUpper.Process
: Responsible for starting the background process that triggers wake-ups.
Usage Examples
All runtimes take a KnockerUpper.Agent
as an argument.
Hence, you can use any KnockerUpper
with any WorkflowRuntime
.
This is encoded in the examples below as a createRuntime
.
def createRuntime(knockerUpper: KnockerUpper.Agent[MyWorkflowId]): WorkflowRuntime[IO, MyWorkflowCtx, MyWorkflowId] = ???
NoOpKnockerUpper
import workflows4s.runtime.wakeup.NoOpKnockerUpper
val knockerUpper = NoOpKnockerUpper.Agent
val runtime: WorkflowRuntime[IO, MyWorkflowCtx, MyWorkflowId] = createRuntime(knockerUpper)
SleepingKnockerUpper
import workflows4s.runtime.wakeup.SleepingKnockerUpper
// all sleeps will be canceled on release
val knockerUpperResource: ResourceIO[SleepingKnockerUpper[MyWorkflowId]] = SleepingKnockerUpper.create()
knockerUpperResource.use(knockerUpper => {
val runtime: WorkflowRuntime[IO, MyWorkflowCtx, MyWorkflowId] = createRuntime(knockerUpper)
val init: IO[Unit] = knockerUpper.initialize(id => runtime.createInstance(id).flatMap(_.wakeup()))
???
})
FilesystemKnockerUpper
import workflows4s.runtime.wakeup.filesystem.FilesystemKnockerUpper
given FilesystemKnockerUpper.StringCodec[MyWorkflowId] = ???
val workDir: java.nio.file.Path = ???
val knockerUpper = FilesystemKnockerUpper.create[MyWorkflowId](workDir)
val runtime: WorkflowRuntime[IO, MyWorkflowCtx, MyWorkflowId] = createRuntime(knockerUpper)
val process: ResourceIO[Unit] = knockerUpper.initialize(id => runtime.createInstance(id).flatMap(_.wakeup()))
QuartzKnockerUpper
import workflows4s.runtime.wakeup.quartz.QuartzKnockerUpper
val scheduler: org.quartz.Scheduler = ???
given StringCodec[MyWorkflowId] = ???
val dispatcher: cats.effect.std.Dispatcher[IO] = ???
scheduler.start()
// allows matching wakeups between restarts
val runtimeId = QuartzKnockerUpper.RuntimeId("my-runtime")
val knockerUpper = new QuartzKnockerUpper(runtimeId, scheduler, dispatcher)
val runtime: WorkflowRuntime[IO, MyWorkflowCtx, MyWorkflowId] = createRuntime(knockerUpper)
val initialization: IO[Unit] = knockerUpper.initialize(id => runtime.createInstance(id).flatMap(_.wakeup()))