feat(workflow): add sealed Event interface with 17 types and UnmarshalEvent

Implements EventType/EventSource/Scope enums, eventBase struct, 17 concrete
event types with typed attributes for Started/Completed/Failed, UnknownEvent
fallback, UnmarshalEvent dispatcher, and SSE envelope types.
This commit is contained in:
2026-04-02 16:27:39 +02:00
parent ae247c9e2e
commit f185a03f58
2 changed files with 514 additions and 0 deletions

356
workflow/event.go Normal file
View File

@@ -0,0 +1,356 @@
package workflow
import (
"encoding/json"
"fmt"
)
// EventType identifies the kind of workflow event.
type EventType string
const (
EventWorkflowStarted EventType = "WORKFLOW_EXECUTION_STARTED"
EventWorkflowCompleted EventType = "WORKFLOW_EXECUTION_COMPLETED"
EventWorkflowFailed EventType = "WORKFLOW_EXECUTION_FAILED"
EventWorkflowCanceled EventType = "WORKFLOW_EXECUTION_CANCELED"
EventWorkflowContinuedAsNew EventType = "WORKFLOW_EXECUTION_CONTINUED_AS_NEW"
EventWorkflowTaskTimedOut EventType = "WORKFLOW_TASK_TIMED_OUT"
EventWorkflowTaskFailed EventType = "WORKFLOW_TASK_FAILED"
EventCustomTaskStarted EventType = "CUSTOM_TASK_STARTED"
EventCustomTaskInProgress EventType = "CUSTOM_TASK_IN_PROGRESS"
EventCustomTaskCompleted EventType = "CUSTOM_TASK_COMPLETED"
EventCustomTaskFailed EventType = "CUSTOM_TASK_FAILED"
EventCustomTaskTimedOut EventType = "CUSTOM_TASK_TIMED_OUT"
EventCustomTaskCanceled EventType = "CUSTOM_TASK_CANCELED"
EventActivityTaskStarted EventType = "ACTIVITY_TASK_STARTED"
EventActivityTaskCompleted EventType = "ACTIVITY_TASK_COMPLETED"
EventActivityTaskRetrying EventType = "ACTIVITY_TASK_RETRYING"
EventActivityTaskFailed EventType = "ACTIVITY_TASK_FAILED"
)
// EventSource identifies where an event originated.
type EventSource string
const (
EventSourceDatabase EventSource = "DATABASE"
EventSourceLive EventSource = "LIVE"
)
// Scope identifies the scope of an event subscription.
type Scope string
const (
ScopeActivity Scope = "activity"
ScopeWorkflow Scope = "workflow"
ScopeAll Scope = "*"
)
// Event is a sealed interface for workflow execution events.
type Event interface {
workflowEvent()
EventType() EventType
}
// eventBase holds fields common to all workflow events.
type eventBase struct {
ID string `json:"event_id"`
Timestamp int64 `json:"event_timestamp"`
RootWorkflowExecID string `json:"root_workflow_exec_id"`
ParentWorkflowExecID *string `json:"parent_workflow_exec_id"`
WorkflowExecID string `json:"workflow_exec_id"`
WorkflowRunID string `json:"workflow_run_id"`
WorkflowName string `json:"workflow_name"`
}
// WorkflowStartedAttributes holds typed attributes for workflow started events.
type WorkflowStartedAttributes struct {
TaskID string `json:"task_id"`
}
// JSONPayload holds a typed JSON value.
type JSONPayload struct {
Value any `json:"value"`
Type string `json:"type"`
}
// WorkflowCompletedAttributes holds typed attributes for workflow completed events.
type WorkflowCompletedAttributes struct {
TaskID string `json:"task_id"`
Result JSONPayload `json:"result"`
}
// WorkflowFailedAttributes holds typed attributes for workflow failed events.
type WorkflowFailedAttributes struct {
TaskID string `json:"task_id"`
Failure any `json:"failure"`
}
// WorkflowExecutionStartedEvent signals that a workflow execution has started.
type WorkflowExecutionStartedEvent struct {
eventBase
Attributes WorkflowStartedAttributes `json:"attributes"`
}
func (*WorkflowExecutionStartedEvent) workflowEvent() {}
func (*WorkflowExecutionStartedEvent) EventType() EventType { return EventWorkflowStarted }
// WorkflowExecutionCompletedEvent signals that a workflow execution has completed.
type WorkflowExecutionCompletedEvent struct {
eventBase
Attributes WorkflowCompletedAttributes `json:"attributes"`
}
func (*WorkflowExecutionCompletedEvent) workflowEvent() {}
func (*WorkflowExecutionCompletedEvent) EventType() EventType { return EventWorkflowCompleted }
// WorkflowExecutionFailedEvent signals that a workflow execution has failed.
type WorkflowExecutionFailedEvent struct {
eventBase
Attributes WorkflowFailedAttributes `json:"attributes"`
}
func (*WorkflowExecutionFailedEvent) workflowEvent() {}
func (*WorkflowExecutionFailedEvent) EventType() EventType { return EventWorkflowFailed }
// WorkflowExecutionCanceledEvent signals that a workflow execution was canceled.
type WorkflowExecutionCanceledEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*WorkflowExecutionCanceledEvent) workflowEvent() {}
func (*WorkflowExecutionCanceledEvent) EventType() EventType { return EventWorkflowCanceled }
// WorkflowExecutionContinuedAsNewEvent signals that a workflow continued as a new execution.
type WorkflowExecutionContinuedAsNewEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*WorkflowExecutionContinuedAsNewEvent) workflowEvent() {}
func (*WorkflowExecutionContinuedAsNewEvent) EventType() EventType { return EventWorkflowContinuedAsNew }
// WorkflowTaskTimedOutEvent signals that a workflow task timed out.
type WorkflowTaskTimedOutEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*WorkflowTaskTimedOutEvent) workflowEvent() {}
func (*WorkflowTaskTimedOutEvent) EventType() EventType { return EventWorkflowTaskTimedOut }
// WorkflowTaskFailedEvent signals that a workflow task failed.
type WorkflowTaskFailedEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*WorkflowTaskFailedEvent) workflowEvent() {}
func (*WorkflowTaskFailedEvent) EventType() EventType { return EventWorkflowTaskFailed }
// CustomTaskStartedEvent signals that a custom task has started.
type CustomTaskStartedEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*CustomTaskStartedEvent) workflowEvent() {}
func (*CustomTaskStartedEvent) EventType() EventType { return EventCustomTaskStarted }
// CustomTaskInProgressEvent signals that a custom task is in progress.
type CustomTaskInProgressEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*CustomTaskInProgressEvent) workflowEvent() {}
func (*CustomTaskInProgressEvent) EventType() EventType { return EventCustomTaskInProgress }
// CustomTaskCompletedEvent signals that a custom task has completed.
type CustomTaskCompletedEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*CustomTaskCompletedEvent) workflowEvent() {}
func (*CustomTaskCompletedEvent) EventType() EventType { return EventCustomTaskCompleted }
// CustomTaskFailedEvent signals that a custom task has failed.
type CustomTaskFailedEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*CustomTaskFailedEvent) workflowEvent() {}
func (*CustomTaskFailedEvent) EventType() EventType { return EventCustomTaskFailed }
// CustomTaskTimedOutEvent signals that a custom task timed out.
type CustomTaskTimedOutEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*CustomTaskTimedOutEvent) workflowEvent() {}
func (*CustomTaskTimedOutEvent) EventType() EventType { return EventCustomTaskTimedOut }
// CustomTaskCanceledEvent signals that a custom task was canceled.
type CustomTaskCanceledEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*CustomTaskCanceledEvent) workflowEvent() {}
func (*CustomTaskCanceledEvent) EventType() EventType { return EventCustomTaskCanceled }
// ActivityTaskStartedEvent signals that an activity task has started.
type ActivityTaskStartedEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*ActivityTaskStartedEvent) workflowEvent() {}
func (*ActivityTaskStartedEvent) EventType() EventType { return EventActivityTaskStarted }
// ActivityTaskCompletedEvent signals that an activity task has completed.
type ActivityTaskCompletedEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*ActivityTaskCompletedEvent) workflowEvent() {}
func (*ActivityTaskCompletedEvent) EventType() EventType { return EventActivityTaskCompleted }
// ActivityTaskRetryingEvent signals that an activity task is being retried.
type ActivityTaskRetryingEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*ActivityTaskRetryingEvent) workflowEvent() {}
func (*ActivityTaskRetryingEvent) EventType() EventType { return EventActivityTaskRetrying }
// ActivityTaskFailedEvent signals that an activity task has failed.
type ActivityTaskFailedEvent struct {
eventBase
Attributes json.RawMessage `json:"attributes"`
}
func (*ActivityTaskFailedEvent) workflowEvent() {}
func (*ActivityTaskFailedEvent) EventType() EventType { return EventActivityTaskFailed }
// UnknownEvent holds an event with an unrecognized event_type.
// This prevents the SDK from breaking when new event types are added.
type UnknownEvent struct {
eventBase
RawType string
Raw json.RawMessage
}
func (*UnknownEvent) workflowEvent() {}
func (e *UnknownEvent) EventType() EventType { return EventType(e.RawType) }
// UnmarshalEvent dispatches JSON to the concrete Event type
// based on the "event_type" discriminator field.
func UnmarshalEvent(data []byte) (Event, error) {
var probe struct {
Type string `json:"event_type"`
}
if err := json.Unmarshal(data, &probe); err != nil {
return nil, fmt.Errorf("mistral: unmarshal workflow event: %w", err)
}
switch probe.Type {
case string(EventWorkflowStarted):
var e WorkflowExecutionStartedEvent
return &e, json.Unmarshal(data, &e)
case string(EventWorkflowCompleted):
var e WorkflowExecutionCompletedEvent
return &e, json.Unmarshal(data, &e)
case string(EventWorkflowFailed):
var e WorkflowExecutionFailedEvent
return &e, json.Unmarshal(data, &e)
case string(EventWorkflowCanceled):
var e WorkflowExecutionCanceledEvent
return &e, json.Unmarshal(data, &e)
case string(EventWorkflowContinuedAsNew):
var e WorkflowExecutionContinuedAsNewEvent
return &e, json.Unmarshal(data, &e)
case string(EventWorkflowTaskTimedOut):
var e WorkflowTaskTimedOutEvent
return &e, json.Unmarshal(data, &e)
case string(EventWorkflowTaskFailed):
var e WorkflowTaskFailedEvent
return &e, json.Unmarshal(data, &e)
case string(EventCustomTaskStarted):
var e CustomTaskStartedEvent
return &e, json.Unmarshal(data, &e)
case string(EventCustomTaskInProgress):
var e CustomTaskInProgressEvent
return &e, json.Unmarshal(data, &e)
case string(EventCustomTaskCompleted):
var e CustomTaskCompletedEvent
return &e, json.Unmarshal(data, &e)
case string(EventCustomTaskFailed):
var e CustomTaskFailedEvent
return &e, json.Unmarshal(data, &e)
case string(EventCustomTaskTimedOut):
var e CustomTaskTimedOutEvent
return &e, json.Unmarshal(data, &e)
case string(EventCustomTaskCanceled):
var e CustomTaskCanceledEvent
return &e, json.Unmarshal(data, &e)
case string(EventActivityTaskStarted):
var e ActivityTaskStartedEvent
return &e, json.Unmarshal(data, &e)
case string(EventActivityTaskCompleted):
var e ActivityTaskCompletedEvent
return &e, json.Unmarshal(data, &e)
case string(EventActivityTaskRetrying):
var e ActivityTaskRetryingEvent
return &e, json.Unmarshal(data, &e)
case string(EventActivityTaskFailed):
var e ActivityTaskFailedEvent
return &e, json.Unmarshal(data, &e)
default:
var base eventBase
if err := json.Unmarshal(data, &base); err != nil {
return nil, fmt.Errorf("mistral: unmarshal workflow event base: %w", err)
}
return &UnknownEvent{
eventBase: base,
RawType: probe.Type,
Raw: json.RawMessage(data),
}, nil
}
}
// StreamPayload is a single SSE payload from the workflow event stream.
type StreamPayload struct {
Data json.RawMessage `json:"data"`
}
// StreamWorkflowContext holds context for a workflow event stream.
type StreamWorkflowContext struct {
WorkflowName string `json:"workflow_name"`
ExecutionID string `json:"execution_id"`
}
// EventStreamParams holds query parameters for streaming workflow events.
type EventStreamParams struct {
Source *EventSource
LastEventID *string
Scope *Scope
}
// EventListParams holds query parameters for listing workflow events.
type EventListParams struct {
Source *EventSource
Scope *Scope
Cursor *string
Limit *int
}
// EventListResponse is the response from listing workflow events.
type EventListResponse struct {
Events []json.RawMessage `json:"events"`
NextCursor *string `json:"next_cursor,omitempty"`
}

158
workflow/event_test.go Normal file
View File

@@ -0,0 +1,158 @@
package workflow
import (
"encoding/json"
"fmt"
"testing"
)
func TestUnmarshalEvent_WorkflowExecutionCompleted(t *testing.T) {
data := []byte(`{
"event_id": "evt-1",
"event_timestamp": 1711929600000000000,
"root_workflow_exec_id": "root-1",
"parent_workflow_exec_id": null,
"workflow_exec_id": "exec-1",
"workflow_run_id": "run-1",
"workflow_name": "my-workflow",
"event_type": "WORKFLOW_EXECUTION_COMPLETED",
"attributes": {"task_id": "t1", "result": {"value": {"answer": 42}, "type": "json"}}
}`)
event, err := UnmarshalEvent(data)
if err != nil {
t.Fatal(err)
}
e, ok := event.(*WorkflowExecutionCompletedEvent)
if !ok {
t.Fatalf("expected *WorkflowExecutionCompletedEvent, got %T", event)
}
if e.ID != "evt-1" {
t.Errorf("got ID %q", e.ID)
}
if e.WorkflowName != "my-workflow" {
t.Errorf("got WorkflowName %q", e.WorkflowName)
}
if e.EventType() != EventWorkflowCompleted {
t.Errorf("got EventType %q", e.EventType())
}
}
func TestUnmarshalEvent_CustomTaskStarted(t *testing.T) {
data := []byte(`{
"event_id": "evt-2",
"event_timestamp": 1711929600000000000,
"root_workflow_exec_id": "root-1",
"parent_workflow_exec_id": "parent-1",
"workflow_exec_id": "exec-1",
"workflow_run_id": "run-1",
"workflow_name": "my-workflow",
"event_type": "CUSTOM_TASK_STARTED",
"attributes": {}
}`)
event, err := UnmarshalEvent(data)
if err != nil {
t.Fatal(err)
}
e, ok := event.(*CustomTaskStartedEvent)
if !ok {
t.Fatalf("expected *CustomTaskStartedEvent, got %T", event)
}
parent := "parent-1"
if e.ParentWorkflowExecID == nil || *e.ParentWorkflowExecID != parent {
t.Errorf("expected parent %q, got %v", parent, e.ParentWorkflowExecID)
}
}
func TestUnmarshalEvent_ActivityTaskRetrying(t *testing.T) {
data := []byte(`{
"event_id": "evt-3",
"event_timestamp": 1711929600000000000,
"root_workflow_exec_id": "root-1",
"parent_workflow_exec_id": null,
"workflow_exec_id": "exec-1",
"workflow_run_id": "run-1",
"workflow_name": "my-workflow",
"event_type": "ACTIVITY_TASK_RETRYING",
"attributes": {}
}`)
event, err := UnmarshalEvent(data)
if err != nil {
t.Fatal(err)
}
if _, ok := event.(*ActivityTaskRetryingEvent); !ok {
t.Fatalf("expected *ActivityTaskRetryingEvent, got %T", event)
}
}
func TestUnmarshalEvent_UnknownType(t *testing.T) {
data := []byte(`{
"event_id": "evt-4",
"event_timestamp": 1711929600000000000,
"root_workflow_exec_id": "root-1",
"parent_workflow_exec_id": null,
"workflow_exec_id": "exec-1",
"workflow_run_id": "run-1",
"workflow_name": "my-workflow",
"event_type": "FUTURE_EVENT_TYPE",
"attributes": {}
}`)
event, err := UnmarshalEvent(data)
if err != nil {
t.Fatal(err)
}
unk, ok := event.(*UnknownEvent)
if !ok {
t.Fatalf("expected *UnknownEvent, got %T", event)
}
if unk.RawType != "FUTURE_EVENT_TYPE" {
t.Errorf("got RawType %q", unk.RawType)
}
}
func TestUnmarshalEvent_AllTypes(t *testing.T) {
types := []struct {
eventType string
wantType string
}{
{"WORKFLOW_EXECUTION_STARTED", "*workflow.WorkflowExecutionStartedEvent"},
{"WORKFLOW_EXECUTION_COMPLETED", "*workflow.WorkflowExecutionCompletedEvent"},
{"WORKFLOW_EXECUTION_FAILED", "*workflow.WorkflowExecutionFailedEvent"},
{"WORKFLOW_EXECUTION_CANCELED", "*workflow.WorkflowExecutionCanceledEvent"},
{"WORKFLOW_EXECUTION_CONTINUED_AS_NEW", "*workflow.WorkflowExecutionContinuedAsNewEvent"},
{"WORKFLOW_TASK_TIMED_OUT", "*workflow.WorkflowTaskTimedOutEvent"},
{"WORKFLOW_TASK_FAILED", "*workflow.WorkflowTaskFailedEvent"},
{"CUSTOM_TASK_STARTED", "*workflow.CustomTaskStartedEvent"},
{"CUSTOM_TASK_IN_PROGRESS", "*workflow.CustomTaskInProgressEvent"},
{"CUSTOM_TASK_COMPLETED", "*workflow.CustomTaskCompletedEvent"},
{"CUSTOM_TASK_FAILED", "*workflow.CustomTaskFailedEvent"},
{"CUSTOM_TASK_TIMED_OUT", "*workflow.CustomTaskTimedOutEvent"},
{"CUSTOM_TASK_CANCELED", "*workflow.CustomTaskCanceledEvent"},
{"ACTIVITY_TASK_STARTED", "*workflow.ActivityTaskStartedEvent"},
{"ACTIVITY_TASK_COMPLETED", "*workflow.ActivityTaskCompletedEvent"},
{"ACTIVITY_TASK_RETRYING", "*workflow.ActivityTaskRetryingEvent"},
{"ACTIVITY_TASK_FAILED", "*workflow.ActivityTaskFailedEvent"},
}
for _, tt := range types {
t.Run(tt.eventType, func(t *testing.T) {
data, _ := json.Marshal(map[string]any{
"event_id": "evt",
"event_timestamp": 1711929600000000000,
"root_workflow_exec_id": "root",
"parent_workflow_exec_id": nil,
"workflow_exec_id": "exec",
"workflow_run_id": "run",
"workflow_name": "wf",
"event_type": tt.eventType,
"attributes": map[string]any{},
})
event, err := UnmarshalEvent(data)
if err != nil {
t.Fatal(err)
}
got := fmt.Sprintf("%T", event)
if got != tt.wantType {
t.Errorf("event_type %q: got %s, want %s", tt.eventType, got, tt.wantType)
}
})
}
}