feat: add workflow events, deployments, and metrics service methods

This commit is contained in:
2026-04-02 16:47:34 +02:00
parent 3b0530a409
commit dc30e09c77
7 changed files with 329 additions and 7 deletions

View File

@@ -339,17 +339,27 @@ type StreamWorkflowContext struct {
// EventStreamParams holds query parameters for streaming workflow events.
type EventStreamParams struct {
Source *EventSource
LastEventID *string
Scope *Scope
Scope *Scope
ActivityName *string
ActivityID *string
WorkflowName *string
WorkflowExecID *string
RootWorkflowExecID *string
ParentWorkflowExecID *string
Stream *string
StartSeq *int
MetadataFilters map[string]any
WorkflowEventTypes []EventType
LastEventID *string
}
// EventListParams holds query parameters for listing workflow events.
type EventListParams struct {
Source *EventSource
Scope *Scope
Cursor *string
Limit *int
RootWorkflowExecID *string
WorkflowExecID *string
WorkflowRunID *string
Limit *int
Cursor *string
}
// EventListResponse is the response from listing workflow events.

41
workflows_deployments.go Normal file
View File

@@ -0,0 +1,41 @@
package mistral
import (
"context"
"fmt"
"net/url"
"strconv"
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
)
// ListWorkflowDeployments lists workflow deployments.
func (c *Client) ListWorkflowDeployments(ctx context.Context, params *workflow.DeploymentListParams) (*workflow.DeploymentListResponse, error) {
path := "/v1/workflows/deployments"
if params != nil {
q := url.Values{}
if params.ActiveOnly != nil {
q.Set("active_only", strconv.FormatBool(*params.ActiveOnly))
}
if params.WorkflowName != nil {
q.Set("workflow_name", *params.WorkflowName)
}
if encoded := q.Encode(); encoded != "" {
path += "?" + encoded
}
}
var resp workflow.DeploymentListResponse
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
return nil, err
}
return &resp, nil
}
// GetWorkflowDeployment retrieves a workflow deployment by ID.
func (c *Client) GetWorkflowDeployment(ctx context.Context, deploymentID string) (*workflow.Deployment, error) {
var resp workflow.Deployment
if err := c.doJSON(ctx, "GET", fmt.Sprintf("/v1/workflows/deployments/%s", deploymentID), nil, &resp); err != nil {
return nil, err
}
return &resp, nil
}

View File

@@ -0,0 +1,57 @@
package mistral
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestListWorkflowDeployments_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1/workflows/deployments" {
t.Errorf("got path %s", r.URL.Path)
}
json.NewEncoder(w).Encode(map[string]any{
"deployments": []map[string]any{
{"id": "dep-1", "name": "prod", "is_active": true, "created_at": "2026-01-01", "updated_at": "2026-01-01"},
},
})
}))
defer server.Close()
client := NewClient("key", WithBaseURL(server.URL))
resp, err := client.ListWorkflowDeployments(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
if len(resp.Deployments) != 1 {
t.Fatalf("got %d deployments", len(resp.Deployments))
}
if resp.Deployments[0].Name != "prod" {
t.Errorf("got name %q", resp.Deployments[0].Name)
}
}
func TestGetWorkflowDeployment_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1/workflows/deployments/dep-1" {
t.Errorf("got path %s", r.URL.Path)
}
json.NewEncoder(w).Encode(map[string]any{
"id": "dep-1", "name": "prod", "is_active": true,
"created_at": "2026-01-01", "updated_at": "2026-01-01",
})
}))
defer server.Close()
client := NewClient("key", WithBaseURL(server.URL))
dep, err := client.GetWorkflowDeployment(context.Background(), "dep-1")
if err != nil {
t.Fatal(err)
}
if dep.ID != "dep-1" {
t.Errorf("got id %q", dep.ID)
}
}

99
workflows_events.go Normal file
View File

@@ -0,0 +1,99 @@
package mistral
import (
"context"
"encoding/json"
"net/url"
"strconv"
"strings"
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
)
// StreamWorkflowEvents streams workflow events via SSE.
func (c *Client) StreamWorkflowEvents(ctx context.Context, params *workflow.EventStreamParams) (*WorkflowEventStream, error) {
path := "/v1/workflows/events/stream"
if params != nil {
q := url.Values{}
if params.Scope != nil {
q.Set("scope", string(*params.Scope))
}
if params.ActivityName != nil {
q.Set("activity_name", *params.ActivityName)
}
if params.ActivityID != nil {
q.Set("activity_id", *params.ActivityID)
}
if params.WorkflowName != nil {
q.Set("workflow_name", *params.WorkflowName)
}
if params.WorkflowExecID != nil {
q.Set("workflow_exec_id", *params.WorkflowExecID)
}
if params.RootWorkflowExecID != nil {
q.Set("root_workflow_exec_id", *params.RootWorkflowExecID)
}
if params.ParentWorkflowExecID != nil {
q.Set("parent_workflow_exec_id", *params.ParentWorkflowExecID)
}
if params.Stream != nil {
q.Set("stream", *params.Stream)
}
if params.StartSeq != nil {
q.Set("start_seq", strconv.Itoa(*params.StartSeq))
}
if params.MetadataFilters != nil {
data, _ := json.Marshal(params.MetadataFilters)
q.Set("metadata_filters", string(data))
}
if len(params.WorkflowEventTypes) > 0 {
types := make([]string, len(params.WorkflowEventTypes))
for i, et := range params.WorkflowEventTypes {
types[i] = string(et)
}
q.Set("workflow_event_types", strings.Join(types, ","))
}
if params.LastEventID != nil {
q.Set("last_event_id", *params.LastEventID)
}
if encoded := q.Encode(); encoded != "" {
path += "?" + encoded
}
}
resp, err := c.doStream(ctx, "GET", path, nil)
if err != nil {
return nil, err
}
return newWorkflowEventStream(resp.Body), nil
}
// ListWorkflowEvents lists workflow events.
func (c *Client) ListWorkflowEvents(ctx context.Context, params *workflow.EventListParams) (*workflow.EventListResponse, error) {
path := "/v1/workflows/events/list"
if params != nil {
q := url.Values{}
if params.RootWorkflowExecID != nil {
q.Set("root_workflow_exec_id", *params.RootWorkflowExecID)
}
if params.WorkflowExecID != nil {
q.Set("workflow_exec_id", *params.WorkflowExecID)
}
if params.WorkflowRunID != nil {
q.Set("workflow_run_id", *params.WorkflowRunID)
}
if params.Limit != nil {
q.Set("limit", strconv.Itoa(*params.Limit))
}
if params.Cursor != nil {
q.Set("cursor", *params.Cursor)
}
if encoded := q.Encode(); encoded != "" {
path += "?" + encoded
}
}
var resp workflow.EventListResponse
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
return nil, err
}
return &resp, nil
}

40
workflows_events_test.go Normal file
View File

@@ -0,0 +1,40 @@
package mistral
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
)
func TestListWorkflowEvents_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1/workflows/events/list" {
t.Errorf("got path %s", r.URL.Path)
}
if r.URL.Query().Get("limit") != "50" {
t.Errorf("got limit %q", r.URL.Query().Get("limit"))
}
json.NewEncoder(w).Encode(map[string]any{
"events": []map[string]any{{"event_type": "WORKFLOW_EXECUTION_STARTED"}},
"next_cursor": "cur-1",
})
}))
defer server.Close()
client := NewClient("key", WithBaseURL(server.URL))
limit := 50
resp, err := client.ListWorkflowEvents(context.Background(), &workflow.EventListParams{Limit: &limit})
if err != nil {
t.Fatal(err)
}
if len(resp.Events) != 1 {
t.Fatalf("got %d events", len(resp.Events))
}
if resp.NextCursor == nil || *resp.NextCursor != "cur-1" {
t.Errorf("got cursor %v", resp.NextCursor)
}
}

31
workflows_metrics.go Normal file
View File

@@ -0,0 +1,31 @@
package mistral
import (
"context"
"fmt"
"net/url"
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
)
// GetWorkflowMetrics retrieves performance metrics for a workflow.
func (c *Client) GetWorkflowMetrics(ctx context.Context, workflowName string, params *workflow.MetricsParams) (*workflow.Metrics, error) {
path := fmt.Sprintf("/v1/workflows/%s/metrics", workflowName)
if params != nil {
q := url.Values{}
if params.StartTime != nil {
q.Set("start_time", *params.StartTime)
}
if params.EndTime != nil {
q.Set("end_time", *params.EndTime)
}
if encoded := q.Encode(); encoded != "" {
path += "?" + encoded
}
}
var resp workflow.Metrics
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
return nil, err
}
return &resp, nil
}

44
workflows_metrics_test.go Normal file
View File

@@ -0,0 +1,44 @@
package mistral
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
)
func TestGetWorkflowMetrics_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1/workflows/my-flow/metrics" {
t.Errorf("got path %s", r.URL.Path)
}
if r.URL.Query().Get("start_time") != "2026-01-01T00:00:00Z" {
t.Errorf("got start_time %q", r.URL.Query().Get("start_time"))
}
json.NewEncoder(w).Encode(map[string]any{
"execution_count": map[string]any{"value": 100},
"success_count": map[string]any{"value": 95},
"error_count": map[string]any{"value": 5},
"average_latency_ms": map[string]any{"value": 1234.5},
"latency_over_time": map[string]any{"value": [][]float64{{1711929600, 1200}, {1711929660, 1300}}},
"retry_rate": map[string]any{"value": 0.02},
})
}))
defer server.Close()
client := NewClient("key", WithBaseURL(server.URL))
start := "2026-01-01T00:00:00Z"
resp, err := client.GetWorkflowMetrics(context.Background(), "my-flow", &workflow.MetricsParams{StartTime: &start})
if err != nil {
t.Fatal(err)
}
if resp.ExecutionCount.Value != 100 {
t.Errorf("got execution_count %v", resp.ExecutionCount.Value)
}
if resp.AverageLatencyMs.Value != 1234.5 {
t.Errorf("got average_latency_ms %v", resp.AverageLatencyMs.Value)
}
}