Postgres Runtime
PostgresRuntime
provides a simpler, yet less powerful alternative to full-fledged event-sourcing solutions like Akka/Pekko.
It stores events in a database table without maintaining any state in memory. Each interaction with the runtime reads events directly from the journal. This design results in higher latency but reduces memory usage.
Usage
Follow these steps to use the Postgres runtime. Detailed explanations are provided in the sections below:
- Add the dependency:
"org.business4s" %% "workflows4s-doobie" % "undefined"
- Include the database migration as part of your application lifecycle.
- Implement the
EventCodec
. - Use the runtime.
Database Migrations
Postgres runtime requires a basic database table to store events. While alternative schemas are possible, they may necessitate a custom WorkflowStorage
. Refer to Other Databases for more details.
CREATE TABLE if not exists workflow_journal
(
event_id SERIAL PRIMARY KEY, -- Auto-incrementing primary key
workflow_id BIGINT NOT NULL, -- The workflow ID
event_data BYTEA NOT NULL, -- The event data (binary)
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -- Timestamp for event creation
);
CREATE INDEX if not exists idx_workflow_id ON workflow_journal (workflow_id);
This implementation makes several assumptions and is intentionally kept simple. For more demanding use cases, consider using a different runtime or implementing a custom storage solution.
Event Codec
Events are stored in the database as binary data. The EventCodec
is used to handle the serialization and deserialization of these events.
Example
Here is an example of using the Postgres runtime:
import MyWorkflowCtx.*
val knockerUpper: KnockerUpper.Agent[WorkflowId] = ???
val workflow: WIO.Initial = ???
val initialState: State = ???
val eventCodec: EventCodec[Event] = ???
val transactor: Transactor[IO] = ???
val runtime: PostgresRuntime[Ctx] = PostgresRuntime.default(workflow, InitialState(), eventCodec, transactor, knockerUpper)
val wfInstance: IO[WorkflowInstance[IO, WCState[Ctx]]] = runtime.createInstance(WorkflowId(1L))
Locking the Workflow
Consider the following scenario:
- A workflow awaits two signals in parallel.
- Both signals arrive simultaneously.
- Only one path of the workflow can be followed.
To address this, workflows are locked, ensuring that only a single signal is processed at a time.
Workflow ID
Locking is implemented using PostgreSQL advisory locks, which require a single BIGINT
as a lock identifier. For simplicity, Long
values are used as WorkflowId
s and serve directly as lock IDs.
Other Databases & Custom Storages
Support for additional DBMSs or alternative workflow storage approaches can be added by implementing the WorkflowStorage
interface. This interface specifies how to:
- Save and read events
- Lock workflows
By extending WorkflowStorage
, you can adapt the runtime to fit your specific requirements.