Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 24 additions & 170 deletions cmd/workflow/pause/pause.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
package pause

import (
"encoding/hex"
"fmt"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
"github.com/spf13/viper"

workflow_registry_v2_wrapper "github.com/smartcontractkit/chainlink-evm/gethwrappers/workflow/generated/workflow_registry_wrapper_v2"

"github.com/smartcontractkit/cre-cli/cmd/client"
cmdCommon "github.com/smartcontractkit/cre-cli/cmd/common"
"github.com/smartcontractkit/cre-cli/internal/environments"
"github.com/smartcontractkit/cre-cli/internal/runtime"
"github.com/smartcontractkit/cre-cli/internal/settings"
"github.com/smartcontractkit/cre-cli/internal/types"
"github.com/smartcontractkit/cre-cli/internal/ui"
"github.com/smartcontractkit/cre-cli/internal/validation"
)
Expand All @@ -29,10 +20,8 @@ const (
)

type Inputs struct {
WorkflowName string `validate:"workflow_name"`
WorkflowOwner string `validate:"workflow_owner"`
WorkflowRegistryContractAddress string `validate:"required"`
WorkflowRegistryContractChainName string `validate:"required"`
WorkflowName string `validate:"workflow_name"`
WorkflowOwner string `validate:"workflow_owner"`
}

func New(runtimeContext *runtime.Context) *cobra.Command {
Expand Down Expand Up @@ -69,13 +58,9 @@ type handler struct {
settings *settings.Settings
environmentSet *environments.EnvironmentSet
inputs Inputs
wrc *client.WorkflowRegistryV2Client
runtimeContext *runtime.Context

validated bool

wg sync.WaitGroup
wrcErr error
}

func newHandler(ctx *runtime.Context) *handler {
Expand All @@ -86,33 +71,20 @@ func newHandler(ctx *runtime.Context) *handler {
environmentSet: ctx.EnvironmentSet,
runtimeContext: ctx,
validated: false,
wg: sync.WaitGroup{},
wrcErr: nil,
}
h.wg.Add(1)
go func() {
defer h.wg.Done()
wrc, err := h.clientFactory.NewWorkflowRegistryV2Client()
if err != nil {
h.wrcErr = fmt.Errorf("failed to create workflow registry client: %w", err)
return
}
h.wrc = wrc
}()

return &h
}

func (h *handler) ResolveInputs(v *viper.Viper) (Inputs, error) {
oc, err := settings.AsOnChain(h.runtimeContext.ResolvedRegistry, "pause")
resolvedWorkflowOwner, err := h.resolveWorkflowOwner(h.runtimeContext.ResolvedRegistry.Type())
if err != nil {
return Inputs{}, err
return Inputs{}, fmt.Errorf("failed to resolve workflow owner: %w", err)
}

return Inputs{
WorkflowName: h.settings.Workflow.UserWorkflowSettings.WorkflowName,
WorkflowOwner: h.settings.Workflow.UserWorkflowSettings.WorkflowOwnerAddress,
WorkflowRegistryContractChainName: oc.ChainName(),
WorkflowRegistryContractAddress: oc.Address(),
WorkflowName: h.settings.Workflow.UserWorkflowSettings.WorkflowName,
WorkflowOwner: resolvedWorkflowOwner,
}, nil
}

Expand All @@ -135,158 +107,40 @@ func (h *handler) Execute() error {
return fmt.Errorf("handler inputs not validated")
}

workflowName := h.inputs.WorkflowName
workflowOwner := common.HexToAddress(h.inputs.WorkflowOwner)

h.displayWorkflowDetails()

h.wg.Wait()
if h.wrcErr != nil {
return h.wrcErr
}

ui.Dim(fmt.Sprintf("Fetching workflows to pause... Name=%s, Owner=%s", workflowName, workflowOwner.Hex()))

workflows, err := fetchAllWorkflows(h.wrc, workflowOwner, workflowName)
strategy, err := newRegistryPauseStrategy(h.runtimeContext.ResolvedRegistry, h)
if err != nil {
return fmt.Errorf("failed to list workflows: %w", err)
}
if len(workflows) == 0 {
return fmt.Errorf("no workflows found for name %q and owner %q", workflowName, workflowOwner.Hex())
return err
}

// Validate precondition: only pause workflows that are currently active
var activeWorkflowIDs [][32]byte
for _, workflow := range workflows {
if workflow.Status == WorkflowStatusActive {
activeWorkflowIDs = append(activeWorkflowIDs, workflow.WorkflowId)
}
}
return strategy.Pause()
}

if len(activeWorkflowIDs) == 0 {
return fmt.Errorf("workflow is already paused, cancelling transaction")
// resolveWorkflowOwner returns the effective owner address for workflow ID computation.
// For private registry deploys, the derived workflow owner from the runtime context is used.
// For onchain deploys, the configured WorkflowOwner address is used directly.
func (h *handler) resolveWorkflowOwner(registryType settings.RegistryType) (string, error) {
if registryType != settings.RegistryTypeOffChain {
return h.settings.Workflow.UserWorkflowSettings.WorkflowOwnerAddress, nil
}

// Note: The way deploy is set up, there will only ever be one workflow in the command for now
h.runtimeContext.Workflow.ID = hex.EncodeToString(activeWorkflowIDs[0][:])

ui.Dim(fmt.Sprintf("Processing batch pause... count=%d", len(activeWorkflowIDs)))

txOut, err := h.wrc.BatchPauseWorkflows(activeWorkflowIDs)
if err != nil {
return fmt.Errorf("failed to batch pause workflows: %w", err)
owner := h.runtimeContext.DerivedWorkflowOwner
if owner == "" {
return "", fmt.Errorf("derived workflow owner is not available; ensure authentication succeeded")
}

oc, _ := h.runtimeContext.ResolvedRegistry.(*settings.OnChainRegistry)

switch txOut.Type {
case client.Regular:
ui.Success("Transaction confirmed")
ui.URL(fmt.Sprintf("%s/tx/%s", oc.ExplorerURL(), txOut.Hash))
ui.Success("Workflows paused successfully")
ui.Line()
ui.Bold("Details:")
ui.Dim(fmt.Sprintf(" Contract address: %s", oc.Address()))
ui.Dim(fmt.Sprintf(" Transaction hash: %s", txOut.Hash))
ui.Dim(fmt.Sprintf(" Workflow Name: %s", workflowName))
for _, w := range activeWorkflowIDs {
ui.Dim(fmt.Sprintf(" Workflow ID: %s", hex.EncodeToString(w[:])))
}

case client.Raw:
ui.Line()
ui.Success("MSIG workflow pause transaction prepared!")
ui.Dim(fmt.Sprintf("To Pause %s", workflowName))
ui.Line()
ui.Bold("Next steps:")
ui.Line()
ui.Print(" 1. Submit the following transaction on the target chain:")
ui.Dim(fmt.Sprintf(" Chain: %s", h.inputs.WorkflowRegistryContractChainName))
ui.Dim(fmt.Sprintf(" Contract Address: %s", txOut.RawTx.To))
ui.Line()
ui.Print(" 2. Use the following transaction data:")
ui.Line()
ui.Code(fmt.Sprintf(" %x", txOut.RawTx.Data))
ui.Line()

case client.Changeset:
chainSelector, err := settings.GetChainSelectorByChainName(oc.ChainName())
if err != nil {
return fmt.Errorf("failed to get chain selector for chain %q: %w", oc.ChainName(), err)
}
mcmsConfig, err := settings.GetMCMSConfig(h.settings, chainSelector)
if err != nil {
ui.Warning("MCMS config not found or is incorrect, skipping MCMS config in changeset")
}
cldSettings := h.settings.CLDSettings
changesets := []types.Changeset{
{
BatchPauseWorkflow: &types.BatchPauseWorkflow{
Payload: types.UserWorkflowBatchPauseInput{
WorkflowIDs: h.runtimeContext.Workflow.ID, // Note: The way deploy is set up, there will only ever be one workflow in the command for now

ChainSelector: chainSelector,
MCMSConfig: mcmsConfig,
WorkflowRegistryQualifier: cldSettings.WorkflowRegistryQualifier,
},
},
},
}
csFile := types.NewChangesetFile(cldSettings.Environment, cldSettings.Domain, cldSettings.MergeProposals, changesets)

var fileName string
if cldSettings.ChangesetFile != "" {
fileName = cldSettings.ChangesetFile
} else {
fileName = fmt.Sprintf("BatchPauseWorkflow_%s_%s.yaml", workflowName, time.Now().Format("20060102_150405"))
}

return cmdCommon.WriteChangesetFile(fileName, csFile, h.settings)

default:
h.log.Warn().Msgf("Unsupported transaction type: %s", txOut.Type)
}
return nil
}

func fetchAllWorkflows(
wrc interface {
GetWorkflowListByOwnerAndName(owner common.Address, workflowName string, start, limit *big.Int) ([]workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView, error)
},
owner common.Address,
name string,
) ([]workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView, error) {
const pageSize = int64(200)
var (
start = big.NewInt(0)
limit = big.NewInt(pageSize)
workflows = make([]workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView, 0, pageSize)
)

for {
list, err := wrc.GetWorkflowListByOwnerAndName(owner, name, start, limit)
if err != nil {
return nil, err
}
if len(list) == 0 {
break
}

workflows = append(workflows, list...)

start = big.NewInt(start.Int64() + int64(len(list)))
if int64(len(list)) < pageSize {
break
}
if len(owner) >= 2 && owner[:2] != "0x" {
owner = "0x" + owner
}

return workflows, nil
return owner, nil
}

func (h *handler) displayWorkflowDetails() {
ui.Line()
ui.Title(fmt.Sprintf("Pausing Workflow: %s", h.inputs.WorkflowName))
ui.Dim(fmt.Sprintf("Target: %s", h.settings.User.TargetName))
ui.Dim(fmt.Sprintf("Owner Address: %s", h.settings.Workflow.UserWorkflowSettings.WorkflowOwnerAddress))
ui.Dim(fmt.Sprintf("Owner Address: %s", h.inputs.WorkflowOwner))
ui.Line()
}
6 changes: 0 additions & 6 deletions cmd/workflow/pause/pause_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ func TestWorkflowPauseCommand(t *testing.T) {
t.Parallel()

validRequired := func(in Inputs) Inputs {
if in.WorkflowRegistryContractAddress == "" {
in.WorkflowRegistryContractAddress = "0x0000000000000000000000000000000000000000"
}
if in.WorkflowRegistryContractChainName == "" {
in.WorkflowRegistryContractChainName = "ethereum-testnet-sepolia"
}
return in
}

Expand Down
18 changes: 18 additions & 0 deletions cmd/workflow/pause/registry_pause_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package pause

import (
"github.com/smartcontractkit/cre-cli/internal/settings"
)

// registryPauseStrategy encapsulates target-specific pause logic.
type registryPauseStrategy interface {
Pause() error
}

// newRegistryPauseStrategy returns the appropriate strategy for the given target.
func newRegistryPauseStrategy(resolvedRegistry settings.ResolvedRegistry, h *handler) (registryPauseStrategy, error) {
if resolvedRegistry.Type() == settings.RegistryTypeOffChain {
return newPrivateRegistryPauseStrategy(h), nil
}
return newOnchainRegistryPauseStrategy(h)
}
Loading
Loading