Skip to content

Products

Products are files generated by workflow steps — reports, invoices, exports, etc. A ProductSender defines how these files are delivered to external systems.

Sending a Product

Call SendProduct from within a step to produce a file:

go
func GenerateInvoice(ctx context.Context) (string, error) {
    content := "Invoice #12345\nAmount: $99.00"

    err := turbine.SendProduct(ctx, "invoice.txt", bytes.NewReader([]byte(content)), map[string]any{
        "type":       "invoice",
        "invoice_id": "INV-12345",
    })
    if err != nil {
        return "", err
    }

    return "generated", nil
}

INFO

Products are deduplicated by (workflow_id, step_id, file_name) — on recovery, a duplicate SendProduct call is a no-op.

WARNING

SendProduct must be called from within a step (turbine.Do or turbine.DoAsync). It cannot be called directly from the workflow function.

ProductSender

Implement ProductSender to define how products are delivered:

go
type ProductSender interface {
    Send(ctx context.Context, product ProductRecord) error
}
go
type S3Sender struct {
    bucket string
}

func (s *S3Sender) Send(ctx context.Context, product turbine.ProductRecord) error {
    fmt.Printf("uploading %s (%d bytes) to s3://%s/\n", product.FileName, product.Size, s.bucket)
    // upload logic here
    return nil
}

Register it in the config:

go
rt := turbine.Setup(app, turbine.Config{
    ProductSender: &S3Sender{bucket: "my-exports"},
})

Or set it after construction:

go
rt.SetProductSender(sender)

If no sender is configured, products are accessible via the PocketBase API but not sent anywhere.

ProductRecord

The ProductRecord passed to your sender contains:

FieldTypeDescription
IDstringPocketBase record ID
FileNamestringOriginal filename
SizeintFile size in bytes
Metadatamap[string]anyCustom metadata
FileURLstringRelative path to download the file

WorkflowSender

Turbine includes a built-in sender that forwards products to another workflow:

go
processExport := func(ctx turbine.Context, product turbine.ProductRecord) (string, error) {
    // process the product file
    return "processed: " + product.FileName, nil
}

turbine.Register(rt, processExport)

sender := turbine.NewWorkflowSender(rt, processExport)
rt.SetProductSender(sender)

Status Lifecycle

Products go through these statuses:

StatusMeaning
storedFile saved to database
sentSender succeeded
failedSender returned an error (file is still stored)

TIP

Products are cascade-deleted when their parent workflow is garbage collected. If you need to keep products longer than the GC retention period, copy them to external storage via a ProductSender.

See the products example for a working demo.