Skip to content

Commit cb4f9fa

Browse files
committed
fix(socket): sync deploy button state across collaborators
Broadcast workflow-deployed events via socket so all connected users invalidate their deployment query cache when any user deploys, undeploys, activates a version, or triggers a deploy through chat/form endpoints.
1 parent c06361b commit cb4f9fa

10 files changed

Lines changed: 118 additions & 1 deletion

File tree

apps/sim/app/api/chat/manage/[id]/route.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { getSession } from '@/lib/auth'
99
import { isDev } from '@/lib/core/config/feature-flags'
1010
import { encryptSecret } from '@/lib/core/security/encryption'
1111
import { getEmailDomain } from '@/lib/core/utils/urls'
12-
import { performChatUndeploy } from '@/lib/workflows/orchestration'
12+
import { notifySocketDeploymentChanged, performChatUndeploy } from '@/lib/workflows/orchestration'
1313
import { deployWorkflow } from '@/lib/workflows/persistence/utils'
1414
import { checkChatAccess } from '@/app/api/chat/utils'
1515
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
@@ -155,6 +155,7 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
155155
logger.info(
156156
`Redeployed workflow ${existingChat[0].workflowId} for chat update (v${deployResult.version})`
157157
)
158+
await notifySocketDeploymentChanged(existingChat[0].workflowId)
158159
}
159160

160161
let encryptedPassword

apps/sim/app/api/form/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { isDev } from '@/lib/core/config/feature-flags'
1010
import { encryptSecret } from '@/lib/core/security/encryption'
1111
import { getEmailDomain } from '@/lib/core/utils/urls'
1212
import { generateId } from '@/lib/core/utils/uuid'
13+
import { notifySocketDeploymentChanged } from '@/lib/workflows/orchestration'
1314
import { deployWorkflow } from '@/lib/workflows/persistence/utils'
1415
import {
1516
checkWorkflowAccessForFormCreation,
@@ -152,6 +153,8 @@ export async function POST(request: NextRequest) {
152153
`${workflowRecord.isDeployed ? 'Redeployed' : 'Auto-deployed'} workflow ${workflowId} for form (v${result.version})`
153154
)
154155

156+
await notifySocketDeploymentChanged(workflowId)
157+
155158
let encryptedPassword = null
156159
if (authType === 'password' && password) {
157160
const { encrypted } = await encryptSecret(password)

apps/sim/app/workspace/providers/socket-provider.tsx

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ interface SocketContextType {
102102
onWorkflowDeleted: (handler: (data: any) => void) => void
103103
onWorkflowReverted: (handler: (data: any) => void) => void
104104
onWorkflowUpdated: (handler: (data: any) => void) => void
105+
onWorkflowDeployed: (handler: (data: any) => void) => void
105106
onOperationConfirmed: (handler: (data: any) => void) => void
106107
onOperationFailed: (handler: (data: any) => void) => void
107108
}
@@ -132,6 +133,7 @@ const SocketContext = createContext<SocketContextType>({
132133
onWorkflowDeleted: () => {},
133134
onWorkflowReverted: () => {},
134135
onWorkflowUpdated: () => {},
136+
onWorkflowDeployed: () => {},
135137
onOperationConfirmed: () => {},
136138
onOperationFailed: () => {},
137139
})
@@ -176,6 +178,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
176178
workflowDeleted?: (data: any) => void
177179
workflowReverted?: (data: any) => void
178180
workflowUpdated?: (data: any) => void
181+
workflowDeployed?: (data: any) => void
179182
operationConfirmed?: (data: any) => void
180183
operationFailed?: (data: any) => void
181184
}>({})
@@ -550,6 +553,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
550553
eventHandlers.current.workflowUpdated?.(data)
551554
})
552555

556+
socketInstance.on('workflow-deployed', (data) => {
557+
logger.info(`Workflow ${data.workflowId} deployment state changed`)
558+
eventHandlers.current.workflowDeployed?.(data)
559+
})
560+
553561
const rehydrateWorkflowStores = async (workflowId: string, workflowState: any) => {
554562
const [
555563
{ useOperationQueueStore },
@@ -994,6 +1002,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
9941002
eventHandlers.current.workflowUpdated = handler
9951003
}, [])
9961004

1005+
const onWorkflowDeployed = useCallback((handler: (data: any) => void) => {
1006+
eventHandlers.current.workflowDeployed = handler
1007+
}, [])
1008+
9971009
const onOperationConfirmed = useCallback((handler: (data: any) => void) => {
9981010
eventHandlers.current.operationConfirmed = handler
9991011
}, [])
@@ -1029,6 +1041,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
10291041
onWorkflowDeleted,
10301042
onWorkflowReverted,
10311043
onWorkflowUpdated,
1044+
onWorkflowDeployed,
10321045
onOperationConfirmed,
10331046
onOperationFailed,
10341047
}),
@@ -1058,6 +1071,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
10581071
onWorkflowDeleted,
10591072
onWorkflowReverted,
10601073
onWorkflowUpdated,
1074+
onWorkflowDeployed,
10611075
onOperationConfirmed,
10621076
onOperationFailed,
10631077
]

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import { useCallback, useEffect, useRef } from 'react'
22
import { createLogger } from '@sim/logger'
3+
import { useQueryClient } from '@tanstack/react-query'
34
import type { Edge } from 'reactflow'
45
import { useShallow } from 'zustand/react/shallow'
56
import { useSession } from '@/lib/auth/auth-client'
67
import { generateId } from '@/lib/core/utils/uuid'
78
import { useSocket } from '@/app/workspace/providers/socket-provider'
89
import { getBlock } from '@/blocks'
910
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
11+
import { invalidateDeploymentQueries } from '@/hooks/queries/deployments'
1012
import { useUndoRedo } from '@/hooks/use-undo-redo'
1113
import {
1214
BLOCK_OPERATIONS,
@@ -34,6 +36,7 @@ import { findAllDescendantNodes, isBlockProtected } from '@/stores/workflows/wor
3436
const logger = createLogger('CollaborativeWorkflow')
3537

3638
export function useCollaborativeWorkflow() {
39+
const queryClient = useQueryClient()
3740
const undoRedo = useUndoRedo()
3841
const isUndoRedoInProgress = useRef(false)
3942
const lastDiffOperationId = useRef<string | null>(null)
@@ -125,6 +128,7 @@ export function useCollaborativeWorkflow() {
125128
onWorkflowDeleted,
126129
onWorkflowReverted,
127130
onWorkflowUpdated,
131+
onWorkflowDeployed,
128132
onOperationConfirmed,
129133
onOperationFailed,
130134
} = useSocket()
@@ -645,6 +649,15 @@ export function useCollaborativeWorkflow() {
645649
}
646650
}
647651

652+
const handleWorkflowDeployed = (data: any) => {
653+
const { workflowId } = data
654+
logger.info(`Workflow ${workflowId} deployment state changed`)
655+
656+
if (workflowId !== activeWorkflowId) return
657+
658+
invalidateDeploymentQueries(queryClient, workflowId)
659+
}
660+
648661
const handleOperationConfirmed = (data: any) => {
649662
const { operationId } = data
650663
logger.debug('Operation confirmed', { operationId })
@@ -664,6 +677,7 @@ export function useCollaborativeWorkflow() {
664677
onWorkflowDeleted(handleWorkflowDeleted)
665678
onWorkflowReverted(handleWorkflowReverted)
666679
onWorkflowUpdated(handleWorkflowUpdated)
680+
onWorkflowDeployed(handleWorkflowDeployed)
667681
onOperationConfirmed(handleOperationConfirmed)
668682
onOperationFailed(handleOperationFailed)
669683
}, [
@@ -673,9 +687,11 @@ export function useCollaborativeWorkflow() {
673687
onWorkflowDeleted,
674688
onWorkflowReverted,
675689
onWorkflowUpdated,
690+
onWorkflowDeployed,
676691
onOperationConfirmed,
677692
onOperationFailed,
678693
activeWorkflowId,
694+
queryClient,
679695
confirmOperation,
680696
failOperation,
681697
emitWorkflowOperation,

apps/sim/lib/workflows/orchestration/deploy.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,26 @@ import type { WorkflowState } from '@/stores/workflows/workflow/types'
3131

3232
const logger = createLogger('DeployOrchestration')
3333

34+
/**
35+
* Notifies the socket server that a workflow's deployment state has changed,
36+
* so all connected clients can refresh their deployment queries.
37+
*/
38+
export async function notifySocketDeploymentChanged(workflowId: string): Promise<void> {
39+
try {
40+
const socketServerUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
41+
await fetch(`${socketServerUrl}/api/workflow-deployed`, {
42+
method: 'POST',
43+
headers: {
44+
'Content-Type': 'application/json',
45+
'x-api-key': env.INTERNAL_API_SECRET,
46+
},
47+
body: JSON.stringify({ workflowId }),
48+
})
49+
} catch (error) {
50+
logger.error('Error sending workflow deployed event to socket server', error)
51+
}
52+
}
53+
3454
export interface PerformFullDeployParams {
3555
workflowId: string
3656
userId: string
@@ -222,6 +242,8 @@ export async function performFullDeploy(
222242
request,
223243
})
224244

245+
await notifySocketDeploymentChanged(workflowId)
246+
225247
return {
226248
success: true,
227249
deployedAt,
@@ -296,6 +318,8 @@ export async function performFullUndeploy(
296318
description: `Undeployed workflow "${(workflowData.name as string) || workflowId}"`,
297319
})
298320

321+
await notifySocketDeploymentChanged(workflowId)
322+
299323
return { success: true }
300324
}
301325

@@ -509,6 +533,8 @@ export async function performActivateVersion(
509533
},
510534
})
511535

536+
await notifySocketDeploymentChanged(workflowId)
537+
512538
return {
513539
success: true,
514540
deployedAt: result.deployedAt,

apps/sim/lib/workflows/orchestration/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export {
1515
type PerformFullUndeployResult,
1616
type PerformRevertToVersionParams,
1717
type PerformRevertToVersionResult,
18+
notifySocketDeploymentChanged,
1819
performActivateVersion,
1920
performFullDeploy,
2021
performFullUndeploy,

apps/sim/socket/rooms/memory-manager.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,4 +238,21 @@ export class MemoryRoomManager implements IRoomManager {
238238

239239
logger.info(`Notified ${room.users.size} users about workflow update: ${workflowId}`)
240240
}
241+
242+
async handleWorkflowDeployed(workflowId: string): Promise<void> {
243+
logger.info(`Handling workflow deployed notification for ${workflowId}`)
244+
245+
const room = this.workflowRooms.get(workflowId)
246+
if (!room) {
247+
logger.debug(`No active room found for deployed workflow ${workflowId}`)
248+
return
249+
}
250+
251+
this._io.to(workflowId).emit('workflow-deployed', {
252+
workflowId,
253+
timestamp: Date.now(),
254+
})
255+
256+
logger.info(`Notified ${room.users.size} users about workflow deployment change: ${workflowId}`)
257+
}
241258
}

apps/sim/socket/rooms/redis-manager.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,4 +439,24 @@ export class RedisRoomManager implements IRoomManager {
439439
const userCount = await this.getUniqueUserCount(workflowId)
440440
logger.info(`Notified ${userCount} users about workflow update: ${workflowId}`)
441441
}
442+
443+
async handleWorkflowDeployed(workflowId: string): Promise<void> {
444+
logger.info(`Handling workflow deployed notification for ${workflowId}`)
445+
446+
const hasRoom = await this.hasWorkflowRoom(workflowId)
447+
if (!hasRoom) {
448+
logger.debug(`No active room found for deployed workflow ${workflowId}`)
449+
return
450+
}
451+
452+
this._io.to(workflowId).emit('workflow-deployed', {
453+
workflowId,
454+
timestamp: Date.now(),
455+
})
456+
457+
const userCount = await this.getUniqueUserCount(workflowId)
458+
logger.info(
459+
`Notified ${userCount} users about workflow deployment change: ${workflowId}`
460+
)
461+
}
442462
}

apps/sim/socket/rooms/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,9 @@ export interface IRoomManager {
138138
* Handle workflow update - notify users
139139
*/
140140
handleWorkflowUpdate(workflowId: string): Promise<void>
141+
142+
/**
143+
* Handle workflow deployment change - notify users to refresh deployment state
144+
*/
145+
handleWorkflowDeployed(workflowId: string): Promise<void>
141146
}

apps/sim/socket/routes/http.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,20 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
122122
return
123123
}
124124

125+
// Handle workflow deployment change notifications from the main API
126+
if (req.method === 'POST' && req.url === '/api/workflow-deployed') {
127+
try {
128+
const body = await readRequestBody(req)
129+
const { workflowId } = JSON.parse(body)
130+
await roomManager.handleWorkflowDeployed(workflowId)
131+
sendSuccess(res)
132+
} catch (error) {
133+
logger.error('Error handling workflow deployed notification:', error)
134+
sendError(res, 'Failed to process deployment notification')
135+
}
136+
return
137+
}
138+
125139
// Handle workflow revert notifications from the main API
126140
if (req.method === 'POST' && req.url === '/api/workflow-reverted') {
127141
try {

0 commit comments

Comments
 (0)