diff --git a/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStream.java b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStream.java new file mode 100644 index 000000000..7a77357b9 --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStream.java @@ -0,0 +1,276 @@ +package com.slack.api.methods; + +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import com.slack.api.model.block.LayoutBlock; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * Async variant of {@link ChatStream} for {@link AsyncMethodsClient}. + *

+ * This class buffers markdown text and flushes via chat.startStream / chat.appendStream, then finalizes via + * chat.stopStream. Access to the mutable stream state is synchronized by this class. + *

+ * Calls that perform Slack API requests are chained in invocation order. If a queued append or stop operation fails, + * any markdown text drained for that operation is restored to the buffer before the next queued operation starts. + * While stop is pending, new append calls are rejected. + */ +@Slf4j +public class AsyncChatStream { + + private final String channel; + private final String threadTs; + private final String recipientTeamId; + private final String recipientUserId; + private final AsyncChatStreamProtocol protocol; + + private final ChatStreamBuffer buffer; + private final ChatStreamLifecycle lifecycle = new ChatStreamLifecycle(); + private final AsyncChatStreamQueue queue = new AsyncChatStreamQueue(); + + @Builder + private AsyncChatStream( + AsyncMethodsClient client, + String channel, + String threadTs, + String recipientTeamId, + String recipientUserId, + Integer bufferSize + ) { + this.channel = channel; + this.threadTs = threadTs; + this.recipientTeamId = recipientTeamId; + this.recipientUserId = recipientUserId; + this.protocol = new AsyncChatStreamProtocol(client, channel, threadTs, recipientTeamId, recipientUserId); + this.buffer = new ChatStreamBuffer(bufferSize != null ? bufferSize : 256); + } + + public int getBufferSize() { + return buffer.getBufferSize(); + } + + public synchronized String getStreamTs() { + return lifecycle.getStreamTs(); + } + + /** + * Append text to the stream. + * + * @param markdownText markdown text to append + * @return a future that completes with a response if the buffer was flushed; completes with null if buffering + * @throws NullPointerException if markdownText is null + */ + public CompletableFuture append(String markdownText) { + Objects.requireNonNull(markdownText, "markdownText"); + synchronized (this) { + if (lifecycle.getState() == ChatStreamLifecycle.StreamState.COMPLETED || queue.isStopRequested()) { + return failedFuture(new SlackChatStreamException("Cannot append to stream: stream state is " + lifecycle.getState())); + } + + buffer.append(markdownText); + + if (buffer.isReadyToFlush()) { + return enqueueFlushBuffer(); + } + + if (log.isDebugEnabled()) { + log.debug("AsyncChatStream appended to buffer: bufferLength={}, bufferSize={}, channel={}, " + + "recipientTeamId={}, recipientUserId={}, threadTs={}", + buffer.length(), getBufferSize(), channel, recipientTeamId, recipientUserId, threadTs); + } + return CompletableFuture.completedFuture(null); + } + } + + public CompletableFuture stop() { + return stop(null, null, null); + } + + public CompletableFuture stop(String markdownText) { + return stop(markdownText, null, null); + } + + /** + * Stop the stream and finalize the message. Any buffered text is sent with the stop request after previously + * queued API calls finish. + * + * @param markdownText Additional text to append before stopping (can be null) + * @param blocks A list of blocks that will be rendered at the bottom of the finalized message (can be null) + * @param metadata Metadata to attach to the message (can be null) + * @return a future that completes with the chat.stopStream API response + */ + public CompletableFuture stop( + String markdownText, + List blocks, + Message.Metadata metadata + ) { + synchronized (this) { + if (lifecycle.getState() == ChatStreamLifecycle.StreamState.COMPLETED || queue.isStopRequested()) { + return failedFuture(new SlackChatStreamException("Cannot stop stream: stream state is " + lifecycle.getState())); + } + + buffer.appendIfNotNull(markdownText); + + return queue.enqueueStop(() -> stopCurrentBuffer(blocks, metadata)); + } + } + + private CompletableFuture enqueueFlushBuffer() { + return queue.enqueueFlush(() -> flushCurrentBuffer()); + } + + private CompletableFuture flushCurrentBuffer() { + String markdownText; + synchronized (this) { + markdownText = buffer.drain(); + } + if (markdownText.length() == 0) { + return CompletableFuture.completedFuture(null); + } + return flushBuffer(markdownText).handle((resp, error) -> { + if (error != null) { + synchronized (AsyncChatStream.this) { + buffer.restore(markdownText); + } + throw propagate(error); + } + return resp; + }); + } + + private CompletableFuture stopCurrentBuffer( + List blocks, + Message.Metadata metadata + ) { + String markdownText; + synchronized (this) { + markdownText = buffer.drain(); + } + return stopStream(markdownText, blocks, metadata).handle((resp, error) -> { + if (error != null) { + synchronized (AsyncChatStream.this) { + buffer.restore(markdownText); + queue.clearStopRequested(); + } + throw propagate(error); + } + return resp; + }); + } + + private static RuntimeException propagate(Throwable cause) { + if (cause instanceof CompletionException && cause.getCause() != null) { + cause = cause.getCause(); + } + if (cause instanceof RuntimeException) { + return (RuntimeException) cause; + } + return new CompletionException(cause); + } + + private CompletableFuture flushBuffer(String markdownText) { + String ts; + synchronized (this) { + ts = lifecycle.getStreamTs(); + } + if (ts == null) { + return protocol.startStream(markdownText) + .thenApply(startResponse -> { + if (!startResponse.isOk() || startResponse.getTs() == null) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to start stream: " + startResponse.getError()); + ex.setStartResponse(startResponse); + throw ex; + } + synchronized (this) { + lifecycle.markStarted(startResponse.getTs()); + } + return toAppendResponse(startResponse); + }); + } else { + return protocol.appendStream(ts, markdownText) + .thenApply(resp -> { + if (!resp.isOk()) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to append to stream: " + resp.getError()); + ex.getAppendResponses().add(resp); + throw ex; + } + return resp; + }); + } + } + + private CompletableFuture stopStream( + String markdownText, + List blocks, + Message.Metadata metadata + ) { + return ensureStartedForStop().thenCompose(ignored -> { + String ts; + synchronized (this) { + ts = lifecycle.getStreamTs(); + } + return protocol.stopStream(ts, markdownText, blocks, metadata) + .thenApply(resp -> { + if (!resp.isOk()) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to stop stream: " + resp.getError()); + ex.setStopResponse(resp); + throw ex; + } + synchronized (this) { + lifecycle.markCompleted(); + } + return resp; + }); + }); + } + + private CompletableFuture ensureStartedForStop() { + String ts; + synchronized (this) { + ts = lifecycle.getStreamTs(); + } + if (ts != null) { + return CompletableFuture.completedFuture(null); + } + return protocol.startStream(null) + .thenApply(startResponse -> { + if (!startResponse.isOk() || startResponse.getTs() == null) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to stop stream: stream not started - " + startResponse.getError()); + ex.setStartResponse(startResponse); + throw ex; + } + synchronized (this) { + lifecycle.markStarted(startResponse.getTs()); + } + return null; + }); + } + + private ChatAppendStreamResponse toAppendResponse(ChatStartStreamResponse startResponse) { + ChatAppendStreamResponse response = new ChatAppendStreamResponse(); + response.setOk(startResponse.isOk()); + response.setChannel(startResponse.getChannel()); + response.setTs(startResponse.getTs()); + response.setWarning(startResponse.getWarning()); + response.setError(startResponse.getError()); + return response; + } + + private static CompletableFuture failedFuture(Throwable cause) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(cause); + return future; + } +} diff --git a/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamProtocol.java b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamProtocol.java new file mode 100644 index 000000000..7ccecfad5 --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamProtocol.java @@ -0,0 +1,69 @@ +package com.slack.api.methods; + +import com.slack.api.methods.request.chat.ChatAppendStreamRequest; +import com.slack.api.methods.request.chat.ChatStartStreamRequest; +import com.slack.api.methods.request.chat.ChatStopStreamRequest; +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import com.slack.api.model.block.LayoutBlock; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +class AsyncChatStreamProtocol { + + private final AsyncMethodsClient client; + private final String channel; + private final String threadTs; + private final String recipientTeamId; + private final String recipientUserId; + + AsyncChatStreamProtocol( + AsyncMethodsClient client, + String channel, + String threadTs, + String recipientTeamId, + String recipientUserId + ) { + this.client = client; + this.channel = channel; + this.threadTs = threadTs; + this.recipientTeamId = recipientTeamId; + this.recipientUserId = recipientUserId; + } + + CompletableFuture startStream(String markdownText) { + return client.chatStartStream(ChatStartStreamRequest.builder() + .channel(channel) + .threadTs(threadTs) + .recipientTeamId(recipientTeamId) + .recipientUserId(recipientUserId) + .markdownText(markdownText) + .build()); + } + + CompletableFuture appendStream(String streamTs, String markdownText) { + return client.chatAppendStream(ChatAppendStreamRequest.builder() + .channel(channel) + .ts(streamTs) + .markdownText(markdownText) + .build()); + } + + CompletableFuture stopStream( + String streamTs, + String markdownText, + List blocks, + Message.Metadata metadata + ) { + return client.chatStopStream(ChatStopStreamRequest.builder() + .channel(channel) + .ts(streamTs) + .markdownText(markdownText) + .blocks(blocks) + .metadata(metadata) + .build()); + } +} diff --git a/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamQueue.java b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamQueue.java new file mode 100644 index 000000000..a0e015664 --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamQueue.java @@ -0,0 +1,65 @@ +package com.slack.api.methods; + +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +class AsyncChatStreamQueue { + + private CompletableFuture pending = CompletableFuture.completedFuture(null); + private boolean flushQueued; + private CompletableFuture queuedFlush; + private boolean stopRequested; + + synchronized boolean isStopRequested() { + return stopRequested; + } + + synchronized void clearStopRequested() { + stopRequested = false; + } + + CompletableFuture enqueueFlush( + Supplier> work + ) { + CompletableFuture currentPending; + synchronized (this) { + if (flushQueued) { + return queuedFlush; + } + flushQueued = true; + currentPending = pending; + } + CompletableFuture result = currentPending.thenCompose(ignored -> { + clearQueuedFlush(); + return work.get(); + }); + synchronized (this) { + queuedFlush = result; + pending = result.handle((resp, error) -> null); + } + return result; + } + + CompletableFuture enqueueStop( + Supplier> work + ) { + CompletableFuture currentPending; + synchronized (this) { + stopRequested = true; + currentPending = pending; + } + CompletableFuture result = currentPending.thenCompose(ignored -> work.get()); + synchronized (this) { + pending = result.handle((resp, error) -> null); + } + return result; + } + + private synchronized void clearQueuedFlush() { + flushQueued = false; + queuedFlush = null; + } +} diff --git a/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java b/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java index 4d9b57ead..d2b1f3702 100644 --- a/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java +++ b/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java @@ -1027,6 +1027,8 @@ CompletableFuture CompletableFuture chatStopStream(RequestConfigurator req); + AsyncChatStream chatStream(RequestConfigurator req); + CompletableFuture chatUpdate(ChatUpdateRequest req); CompletableFuture chatUpdate(RequestConfigurator req); diff --git a/slack-api-client/src/main/java/com/slack/api/methods/ChatStream.java b/slack-api-client/src/main/java/com/slack/api/methods/ChatStream.java new file mode 100644 index 000000000..25d555ec4 --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/ChatStream.java @@ -0,0 +1,240 @@ +package com.slack.api.methods; + +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import com.slack.api.model.block.LayoutBlock; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * A class for streaming markdown text into a conversation using the chat streaming APIs. + *

+ * This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API + * methods, with automatic buffering and state management. + *

+ * This class is not thread-safe. Use {@link AsyncChatStream} when appends and stop calls may be invoked from + * multiple threads. + *

+ * Typical usage is to use the {@link MethodsClient#chatStream} method: + * + *

+ * {@code
+ * MethodsClient client = Slack.getInstance().methods(token);
+ * ChatStream stream = client.chatStream(req -> req
+ *     .channel("C0123456789")
+ *     .threadTs("1700000001.123456")
+ *     .recipientTeamId("T0123456789")
+ *     .recipientUserId("U0123456789")
+ *     .bufferSize(100));
+ *
+ * stream.append("**hello wo");
+ * stream.append("rld!**");
+ * ChatStopStreamResponse response = stream.stop();
+ * }
+ * 
+ */ +@Slf4j +public class ChatStream { + + private final String channel; + private final String threadTs; + private final String recipientTeamId; + private final String recipientUserId; + private final ChatStreamProtocol protocol; + + /** + * The length of markdown_text to buffer in-memory before calling a method. + * Increasing this value decreases the number of method calls made for the same amount of text, + * which is useful to avoid rate limits. + * Default is 256. + */ + private final ChatStreamBuffer buffer; + private final ChatStreamLifecycle lifecycle = new ChatStreamLifecycle(); + + @Builder + private ChatStream( + MethodsClient client, + String channel, + String threadTs, + String recipientTeamId, + String recipientUserId, + Integer bufferSize + ) { + this.channel = channel; + this.threadTs = threadTs; + this.recipientTeamId = recipientTeamId; + this.recipientUserId = recipientUserId; + this.protocol = new ChatStreamProtocol(client, channel, threadTs, recipientTeamId, recipientUserId); + this.buffer = new ChatStreamBuffer(bufferSize != null ? bufferSize : 256); + } + + public int getBufferSize() { + return buffer.getBufferSize(); + } + + public String getStreamTs() { + return lifecycle.getStreamTs(); + } + + /** + * Append text to the stream. + *

+ * This method can be called multiple times. After the stream is stopped, this method cannot be called. + * + * @param markdownText Accepts message text formatted in markdown. Limit this field to 12,000 characters. + * This text is what will be appended to the message received so far. + * @return ChatAppendStreamResponse if the buffer was flushed; null if the text was only buffered + * @throws SlackChatStreamException if the stream is already completed or an API error occurs + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + * @throws NullPointerException if markdownText is null + */ + public ChatAppendStreamResponse append(String markdownText) throws IOException, SlackApiException { + Objects.requireNonNull(markdownText, "markdownText"); + lifecycle.verifyAppendable(); + + buffer.append(markdownText); + + if (buffer.isReadyToFlush()) { + return flushBuffer(); + } + + if (log.isDebugEnabled()) { + log.debug("ChatStream appended to buffer: bufferLength={}, bufferSize={}, channel={}, " + + "recipientTeamId={}, recipientUserId={}, threadTs={}", + buffer.length(), getBufferSize(), channel, recipientTeamId, recipientUserId, threadTs); + } + + return null; + } + + /** + * Stop the stream and finalize the message. + * + * @return ChatStopStreamResponse from the chat.stopStream API call + * @throws SlackChatStreamException if the stream is already completed or an API error occurs + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + */ + public ChatStopStreamResponse stop() throws IOException, SlackApiException { + return stop(null, null, null); + } + + /** + * Stop the stream and finalize the message. + * + * @param markdownText Additional text to append before stopping + * @return ChatStopStreamResponse from the chat.stopStream API call + * @throws SlackChatStreamException if the stream is already completed or an API error occurs + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + */ + public ChatStopStreamResponse stop(String markdownText) throws IOException, SlackApiException { + return stop(markdownText, null, null); + } + + /** + * Stop the stream and finalize the message. + * + * @param markdownText Additional text to append before stopping (can be null) + * @param blocks A list of blocks that will be rendered at the bottom of the finalized message (can be null) + * @param metadata Metadata to attach to the message (can be null) + * @return ChatStopStreamResponse from the chat.stopStream API call + * @throws SlackChatStreamException if the stream is already completed or an API error occurs + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + */ + public ChatStopStreamResponse stop( + String markdownText, + List blocks, + Message.Metadata metadata + ) throws IOException, SlackApiException { + lifecycle.verifyStoppable(); + + buffer.appendIfNotNull(markdownText); + + // If the stream hasn't started yet, start it first + if (!lifecycle.isStarted()) { + ChatStartStreamResponse startResponse = protocol.startStream(null); + + if (!startResponse.isOk() || startResponse.getTs() == null) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to stop stream: stream not started - " + startResponse.getError()); + ex.setStartResponse(startResponse); + throw ex; + } + + lifecycle.markStarted(startResponse.getTs()); + } + + ChatStopStreamResponse response = protocol.stopStream( + lifecycle.getStreamTs(), buffer.getMarkdownText(), blocks, metadata); + + if (!response.isOk()) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to stop stream: " + response.getError()); + ex.setStopResponse(response); + throw ex; + } + + lifecycle.markCompleted(); + return response; + } + + /** + * Flush the internal buffer by making appropriate API calls. + * + * @return ChatAppendStreamResponse from the API call (or a synthesized response for the first call) + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + */ + private ChatAppendStreamResponse flushBuffer() throws IOException, SlackApiException { + ChatAppendStreamResponse response; + + if (!lifecycle.isStarted()) { + // First flush - start the stream + ChatStartStreamResponse startResponse = protocol.startStream(buffer.getMarkdownText()); + + if (!startResponse.isOk() || startResponse.getTs() == null) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to start stream: " + startResponse.getError()); + ex.setStartResponse(startResponse); + throw ex; + } + + lifecycle.markStarted(startResponse.getTs()); + + response = toAppendResponse(startResponse); + } else { + // Subsequent flush - append to stream + response = protocol.appendStream(lifecycle.getStreamTs(), buffer.getMarkdownText()); + + if (!response.isOk()) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to append to stream: " + response.getError()); + ex.getAppendResponses().add(response); + throw ex; + } + } + + // Clear the buffer + buffer.clear(); + return response; + } + + private ChatAppendStreamResponse toAppendResponse(ChatStartStreamResponse startResponse) { + ChatAppendStreamResponse response = new ChatAppendStreamResponse(); + response.setOk(startResponse.isOk()); + response.setChannel(startResponse.getChannel()); + response.setTs(startResponse.getTs()); + response.setWarning(startResponse.getWarning()); + response.setError(startResponse.getError()); + return response; + } +} diff --git a/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamBuffer.java b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamBuffer.java new file mode 100644 index 000000000..ee145bbd2 --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamBuffer.java @@ -0,0 +1,54 @@ +package com.slack.api.methods; + +class ChatStreamBuffer { + + private final int bufferSize; + private final StringBuilder buffer = new StringBuilder(); + + ChatStreamBuffer(int bufferSize) { + if (bufferSize <= 0) { + throw new IllegalArgumentException("bufferSize must be greater than 0"); + } + this.bufferSize = bufferSize; + } + + int getBufferSize() { + return bufferSize; + } + + int length() { + return buffer.length(); + } + + void append(String markdownText) { + buffer.append(markdownText); + } + + void appendIfNotNull(String markdownText) { + if (markdownText != null) { + append(markdownText); + } + } + + boolean isReadyToFlush() { + return length() >= bufferSize; + } + + String getMarkdownText() { + return buffer.toString(); + } + + String drain() { + String markdownText = getMarkdownText(); + clear(); + return markdownText; + } + + void restore(String markdownText) { + buffer.insert(0, markdownText); + } + + void clear() { + buffer.setLength(0); + } +} diff --git a/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamLifecycle.java b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamLifecycle.java new file mode 100644 index 000000000..77126552d --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamLifecycle.java @@ -0,0 +1,46 @@ +package com.slack.api.methods; + +class ChatStreamLifecycle { + + enum StreamState { + STARTING, + IN_PROGRESS, + COMPLETED + } + + private StreamState state = StreamState.STARTING; + private String streamTs; + + String getStreamTs() { + return streamTs; + } + + StreamState getState() { + return state; + } + + boolean isStarted() { + return streamTs != null; + } + + void verifyAppendable() { + if (state == StreamState.COMPLETED) { + throw new SlackChatStreamException("Cannot append to stream: stream state is " + state); + } + } + + void verifyStoppable() { + if (state == StreamState.COMPLETED) { + throw new SlackChatStreamException("Cannot stop stream: stream state is " + state); + } + } + + void markStarted(String streamTs) { + this.streamTs = streamTs; + this.state = StreamState.IN_PROGRESS; + } + + void markCompleted() { + this.state = StreamState.COMPLETED; + } +} diff --git a/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamProtocol.java b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamProtocol.java new file mode 100644 index 000000000..0994e059a --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamProtocol.java @@ -0,0 +1,69 @@ +package com.slack.api.methods; + +import com.slack.api.methods.request.chat.ChatAppendStreamRequest; +import com.slack.api.methods.request.chat.ChatStartStreamRequest; +import com.slack.api.methods.request.chat.ChatStopStreamRequest; +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import com.slack.api.model.block.LayoutBlock; + +import java.io.IOException; +import java.util.List; + +class ChatStreamProtocol { + + private final MethodsClient client; + private final String channel; + private final String threadTs; + private final String recipientTeamId; + private final String recipientUserId; + + ChatStreamProtocol( + MethodsClient client, + String channel, + String threadTs, + String recipientTeamId, + String recipientUserId + ) { + this.client = client; + this.channel = channel; + this.threadTs = threadTs; + this.recipientTeamId = recipientTeamId; + this.recipientUserId = recipientUserId; + } + + ChatStartStreamResponse startStream(String markdownText) throws IOException, SlackApiException { + return client.chatStartStream(ChatStartStreamRequest.builder() + .channel(channel) + .threadTs(threadTs) + .recipientTeamId(recipientTeamId) + .recipientUserId(recipientUserId) + .markdownText(markdownText) + .build()); + } + + ChatAppendStreamResponse appendStream(String streamTs, String markdownText) throws IOException, SlackApiException { + return client.chatAppendStream(ChatAppendStreamRequest.builder() + .channel(channel) + .ts(streamTs) + .markdownText(markdownText) + .build()); + } + + ChatStopStreamResponse stopStream( + String streamTs, + String markdownText, + List blocks, + Message.Metadata metadata + ) throws IOException, SlackApiException { + return client.chatStopStream(ChatStopStreamRequest.builder() + .channel(channel) + .ts(streamTs) + .markdownText(markdownText) + .blocks(blocks) + .metadata(metadata) + .build()); + } +} diff --git a/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java b/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java index 3a4722eb0..50c4c0aa8 100644 --- a/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java +++ b/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java @@ -1673,6 +1673,8 @@ ChatScheduleMessageResponse chatScheduleMessage( ChatStopStreamResponse chatStopStream(RequestConfigurator req) throws IOException, SlackApiException; + ChatStream chatStream(RequestConfigurator req); + ChatUpdateResponse chatUpdate(ChatUpdateRequest req) throws IOException, SlackApiException; ChatUpdateResponse chatUpdate(RequestConfigurator req) diff --git a/slack-api-client/src/main/java/com/slack/api/methods/SlackChatStreamException.java b/slack-api-client/src/main/java/com/slack/api/methods/SlackChatStreamException.java new file mode 100644 index 000000000..ef43404c3 --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/SlackChatStreamException.java @@ -0,0 +1,33 @@ +package com.slack.api.methods; + +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import lombok.Getter; +import lombok.Setter; + +import java.util.ArrayList; +import java.util.List; + +/** + * Represents an error that occurred during chat streaming operations. + */ +public class SlackChatStreamException extends RuntimeException { + + @Getter + @Setter + private ChatStartStreamResponse startResponse; + @Getter + private final List appendResponses = new ArrayList<>(); + @Getter + @Setter + private ChatStopStreamResponse stopResponse; + + public SlackChatStreamException(String message) { + super(message); + } + + public SlackChatStreamException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java b/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java index 21e007510..156f87711 100644 --- a/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java +++ b/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java @@ -2,6 +2,7 @@ import com.slack.api.RequestConfigurator; import com.slack.api.SlackConfig; +import com.slack.api.methods.AsyncChatStream; import com.slack.api.methods.AsyncMethodsClient; import com.slack.api.methods.MethodsClient; import com.slack.api.methods.SlackApiRequest; @@ -1753,6 +1754,11 @@ public CompletableFuture chatStopStream(RequestConfigura return chatStopStream(req.configure(ChatStopStreamRequest.builder()).build()); } + @Override + public AsyncChatStream chatStream(RequestConfigurator req) { + return req.configure(AsyncChatStream.builder().client(this)).build(); + } + @Override public CompletableFuture chatUpdate(ChatUpdateRequest req) { return executor.execute(CHAT_UPDATE, toMap(req), () -> methods.chatUpdate(req)); diff --git a/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java b/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java index f1d26b619..31899c06f 100644 --- a/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java +++ b/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java @@ -1988,6 +1988,11 @@ public ChatStopStreamResponse chatStopStream(RequestConfigurator req) { + return req.configure(ChatStream.builder().client(this)).build(); + } + @Override public ChatUpdateResponse chatUpdate(ChatUpdateRequest req) throws IOException, SlackApiException { return postFormWithTokenAndParseResponse(toForm(req), Methods.CHAT_UPDATE, getToken(req), ChatUpdateResponse.class); diff --git a/slack-api-client/src/test/java/test_locally/api/methods/AsyncChatStreamTest.java b/slack-api-client/src/test/java/test_locally/api/methods/AsyncChatStreamTest.java new file mode 100644 index 000000000..d97800df8 --- /dev/null +++ b/slack-api-client/src/test/java/test_locally/api/methods/AsyncChatStreamTest.java @@ -0,0 +1,344 @@ +package test_locally.api.methods; + +import com.slack.api.Slack; +import com.slack.api.SlackConfig; +import com.slack.api.methods.AsyncChatStream; +import com.slack.api.methods.AsyncMethodsClient; +import com.slack.api.methods.SlackChatStreamException; +import com.slack.api.methods.request.chat.ChatAppendStreamRequest; +import com.slack.api.methods.request.chat.ChatStartStreamRequest; +import com.slack.api.methods.request.chat.ChatStopStreamRequest; +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import util.MockSlackApiServer; + +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static com.slack.api.model.block.Blocks.section; +import static com.slack.api.model.block.composition.BlockCompositions.plainText; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static util.MockSlackApi.ValidToken; + +public class AsyncChatStreamTest { + + private final MockSlackApiServer server = new MockSlackApiServer(); + private final SlackConfig config = new SlackConfig(); + private final Slack slack = Slack.getInstance(config); + + @Before + public void setup() throws Exception { + server.start(); + config.setMethodsEndpointUrlPrefix(server.getMethodsEndpointPrefix()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void append_buffers_when_under_bufferSize() throws Exception { + AsyncChatStream stream = slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(256)); + + ChatAppendStreamResponse resp = stream.append("hello").get(); + assertThat(resp, is(nullValue())); + assertThat(stream.getStreamTs(), is(nullValue())); + } + + @Test(expected = NullPointerException.class) + public void append_rejects_null_markdown_text() { + AsyncChatStream stream = slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123")); + + stream.append(null); + } + + @Test + public void append_flushes_and_starts_stream_on_first_flush() throws Exception { + AsyncChatStream stream = slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(3)); + + ChatAppendStreamResponse resp = stream.append("hey").get(); // triggers flush + assertThat(resp.isOk(), is(true)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + } + + @Test + public void stop_completes() throws Exception { + AsyncChatStream stream = slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.append("hello").get(); // buffered only + ChatStopStreamResponse stop = stream.stop().get(); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + } + + @Test + public void stop_throws_with_stop_response_when_stopStream_fails() throws Exception { + AsyncMethodsClient client = mock(AsyncMethodsClient.class); + ChatStartStreamResponse startResponse = new ChatStartStreamResponse(); + startResponse.setOk(true); + startResponse.setTs("0000000000.000000"); + when(client.chatStartStream(any(ChatStartStreamRequest.class))) + .thenReturn(CompletableFuture.completedFuture(startResponse)); + + ChatStopStreamResponse stopResponse = new ChatStopStreamResponse(); + stopResponse.setOk(false); + stopResponse.setError("fatal_error"); + when(client.chatStopStream(any(ChatStopStreamRequest.class))) + .thenReturn(CompletableFuture.completedFuture(stopResponse)); + + AsyncChatStream stream = AsyncChatStream.builder() + .client(client) + .channel("C123") + .build(); + + try { + stream.stop("hello").get(); + } catch (ExecutionException e) { + assertThat(e.getCause() instanceof SlackChatStreamException, is(true)); + SlackChatStreamException cause = (SlackChatStreamException) e.getCause(); + assertThat(cause.getMessage(), is("Failed to stop stream: fatal_error")); + assertThat(cause.getStopResponse(), is(stopResponse)); + return; + } + org.junit.Assert.fail("Expected SlackChatStreamException"); + } + + @Test(expected = SlackChatStreamException.class) + public void append_throws_after_completed() throws Throwable { + AsyncChatStream stream = slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.stop().get(); + try { + stream.append("nope").get(); + } catch (Exception e) { + // unwrap ExecutionException / CompletionException + Throwable cause = e.getCause() != null ? e.getCause() : e; + throw cause; + } + } + + @Test(expected = SlackChatStreamException.class) + public void stop_throws_after_completed() throws Throwable { + AsyncChatStream stream = slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.stop().get(); + try { + stream.stop().get(); // second stop should throw + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test + public void stop_with_additional_markdown_text() throws Exception { + AsyncMethodsClient client = mock(AsyncMethodsClient.class); + ChatStartStreamResponse startResponse = new ChatStartStreamResponse(); + startResponse.setOk(true); + startResponse.setTs("0000000000.000000"); + when(client.chatStartStream(any(ChatStartStreamRequest.class))) + .thenReturn(CompletableFuture.completedFuture(startResponse)); + + ChatStopStreamResponse stopResponse = new ChatStopStreamResponse(); + stopResponse.setOk(true); + when(client.chatStopStream(any(ChatStopStreamRequest.class))) + .thenReturn(CompletableFuture.completedFuture(stopResponse)); + + AsyncChatStream stream = AsyncChatStream.builder() + .client(client) + .channel("C123") + .build(); + + stream.append("hello ").get(); + ChatStopStreamResponse stop = stream.stop("world!").get(); + assertThat(stop.isOk(), is(true)); + verify(client).chatStopStream(argThat((ChatStopStreamRequest req) -> "hello world!".equals(req.getMarkdownText()))); + } + + @Test + public void stop_with_blocks_and_metadata() throws Exception { + AsyncChatStream stream = slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + Message.Metadata metadata = new Message.Metadata(); + metadata.setEventType("test_event"); + metadata.setEventPayload(Collections.singletonMap("key", "value")); + + ChatStopStreamResponse stop = stream.stop( + "final text", + Collections.singletonList(section(s -> s.text(plainText("Block text")))), + metadata + ).get(); + assertThat(stop.isOk(), is(true)); + } + + @Test + public void stop_after_stream_already_started() throws Exception { + AsyncChatStream stream = slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1)); // force immediate flush + + // Start the stream via append + ChatAppendStreamResponse appendResp = stream.append("a").get(); + assertThat(appendResp.isOk(), is(true)); + assertThat(stream.getStreamTs(), is(notNullValue())); + + // Now stop - should not call startStream again + ChatStopStreamResponse stop = stream.stop().get(); + assertThat(stop.isOk(), is(true)); + } + + @Test + public void default_buffer_size_is_256() { + AsyncChatStream stream = slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123")); + + assertThat(stream.getBufferSize(), is(256)); + } + + @Test(expected = IllegalArgumentException.class) + public void rejects_zero_buffer_size() { + slack.methodsAsync(ValidToken).chatStream(req -> req + .channel("C123") + .bufferSize(0)); + } + + @Test + public void multiple_appends_accumulate_in_buffer() throws Exception { + AsyncMethodsClient client = mock(AsyncMethodsClient.class); + ChatStartStreamResponse startResponse = new ChatStartStreamResponse(); + startResponse.setOk(true); + startResponse.setTs("0000000000.000000"); + when(client.chatStartStream(any(ChatStartStreamRequest.class))) + .thenReturn(CompletableFuture.completedFuture(startResponse)); + + ChatStopStreamResponse stopResponse = new ChatStopStreamResponse(); + stopResponse.setOk(true); + when(client.chatStopStream(any(ChatStopStreamRequest.class))) + .thenReturn(CompletableFuture.completedFuture(stopResponse)); + + AsyncChatStream stream = AsyncChatStream.builder() + .client(client) + .channel("C123") + .bufferSize(256) + .build(); + + stream.append("hello").get(); + stream.append(" ").get(); + stream.append("world").get(); + + stream.stop().get(); + verify(client).chatStopStream(argThat((ChatStopStreamRequest req) -> "hello world".equals(req.getMarkdownText()))); + } + + @Test + public void append_api_calls_are_serialized() throws Exception { + AsyncMethodsClient client = mock(AsyncMethodsClient.class); + CompletableFuture startFuture = new CompletableFuture<>(); + when(client.chatStartStream(any(ChatStartStreamRequest.class))).thenReturn(startFuture); + + ChatAppendStreamResponse appendResponse = new ChatAppendStreamResponse(); + appendResponse.setOk(true); + when(client.chatAppendStream(any(ChatAppendStreamRequest.class))) + .thenReturn(CompletableFuture.completedFuture(appendResponse)); + + AsyncChatStream stream = AsyncChatStream.builder() + .client(client) + .channel("C123") + .bufferSize(1) + .build(); + + CompletableFuture first = stream.append("a"); + CompletableFuture second = stream.append("b"); + + verify(client).chatStartStream(argThat((ChatStartStreamRequest req) -> "a".equals(req.getMarkdownText()))); + verify(client, never()).chatAppendStream(any(ChatAppendStreamRequest.class)); + + ChatStartStreamResponse startResponse = new ChatStartStreamResponse(); + startResponse.setOk(true); + startResponse.setTs("0000000000.000000"); + startFuture.complete(startResponse); + + assertThat(first.get().isOk(), is(true)); + assertThat(second.get().isOk(), is(true)); + verify(client).chatAppendStream(argThat((ChatAppendStreamRequest req) -> "b".equals(req.getMarkdownText()))); + } + + @Test + public void queued_stop_uses_restored_buffer_after_failed_append() throws Exception { + AsyncMethodsClient client = mock(AsyncMethodsClient.class); + ChatStartStreamResponse startResponse = new ChatStartStreamResponse(); + startResponse.setOk(true); + startResponse.setTs("0000000000.000000"); + when(client.chatStartStream(any(ChatStartStreamRequest.class))) + .thenReturn(CompletableFuture.completedFuture(startResponse)); + + CompletableFuture appendFuture = new CompletableFuture<>(); + when(client.chatAppendStream(any(ChatAppendStreamRequest.class))).thenReturn(appendFuture); + + ChatStopStreamResponse stopResponse = new ChatStopStreamResponse(); + stopResponse.setOk(true); + when(client.chatStopStream(any(ChatStopStreamRequest.class))) + .thenReturn(CompletableFuture.completedFuture(stopResponse)); + + AsyncChatStream stream = AsyncChatStream.builder() + .client(client) + .channel("C123") + .bufferSize(1) + .build(); + + assertThat(stream.append("a").get().isOk(), is(true)); + CompletableFuture failedAppend = stream.append("b"); + CompletableFuture stop = stream.stop("c"); + + ChatAppendStreamResponse appendResponse = new ChatAppendStreamResponse(); + appendResponse.setOk(false); + appendResponse.setError("fatal_error"); + appendFuture.complete(appendResponse); + + try { + failedAppend.get(); + org.junit.Assert.fail("Expected SlackChatStreamException"); + } catch (ExecutionException e) { + assertThat(e.getCause() instanceof SlackChatStreamException, is(true)); + } + + assertThat(stop.get().isOk(), is(true)); + verify(client).chatStopStream(argThat((ChatStopStreamRequest req) -> "bc".equals(req.getMarkdownText()))); + } +} diff --git a/slack-api-client/src/test/java/test_locally/api/methods/ChatStreamTest.java b/slack-api-client/src/test/java/test_locally/api/methods/ChatStreamTest.java new file mode 100644 index 000000000..16e33f3be --- /dev/null +++ b/slack-api-client/src/test/java/test_locally/api/methods/ChatStreamTest.java @@ -0,0 +1,264 @@ +package test_locally.api.methods; + +import com.slack.api.Slack; +import com.slack.api.SlackConfig; +import com.slack.api.methods.ChatStream; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackChatStreamException; +import com.slack.api.methods.request.chat.ChatStartStreamRequest; +import com.slack.api.methods.request.chat.ChatStopStreamRequest; +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import util.MockSlackApiServer; + +import java.util.Collections; + +import static com.slack.api.model.block.Blocks.section; +import static com.slack.api.model.block.composition.BlockCompositions.plainText; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static util.MockSlackApi.ValidToken; + +public class ChatStreamTest { + + private final MockSlackApiServer server = new MockSlackApiServer(); + private final SlackConfig config = new SlackConfig(); + private final Slack slack = Slack.getInstance(config); + + @Before + public void setup() throws Exception { + server.start(); + config.setMethodsEndpointUrlPrefix(server.getMethodsEndpointPrefix()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void append_buffers_when_under_bufferSize() throws Exception { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(256)); + + ChatAppendStreamResponse resp = stream.append("hello"); + assertThat(resp, is(nullValue())); + assertThat(stream.getStreamTs(), is(nullValue())); + } + + @Test(expected = NullPointerException.class) + public void append_rejects_null_markdown_text() throws Exception { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123")); + + stream.append(null); + } + + @Test + public void append_flushes_and_starts_stream_on_first_flush() throws Exception { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(3)); + + ChatAppendStreamResponse resp = stream.append("hey"); // triggers flush + assertThat(resp.isOk(), is(true)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + } + + @Test + public void append_flushes_with_appendStream_after_started() throws Exception { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1)); + + // first flush uses chat.startStream + ChatAppendStreamResponse first = stream.append("a"); + assertThat(first.isOk(), is(true)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + + // second flush uses chat.appendStream + ChatAppendStreamResponse second = stream.append("b"); + assertThat(second.isOk(), is(true)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + } + + @Test + public void stop_starts_stream_if_needed_and_completes() throws Exception { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.append("hello"); // buffered only + ChatStopStreamResponse stop = stream.stop(); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + } + + @Test + public void stop_throws_with_stop_response_when_stopStream_fails() throws Exception { + MethodsClient client = mock(MethodsClient.class); + ChatStartStreamResponse startResponse = new ChatStartStreamResponse(); + startResponse.setOk(true); + startResponse.setTs("0000000000.000000"); + when(client.chatStartStream(any(ChatStartStreamRequest.class))).thenReturn(startResponse); + + ChatStopStreamResponse stopResponse = new ChatStopStreamResponse(); + stopResponse.setOk(false); + stopResponse.setError("fatal_error"); + when(client.chatStopStream(any(ChatStopStreamRequest.class))).thenReturn(stopResponse); + + ChatStream stream = ChatStream.builder() + .client(client) + .channel("C123") + .build(); + + try { + stream.stop("hello"); + Assert.fail("Expected SlackChatStreamException"); + } catch (SlackChatStreamException e) { + assertThat(e.getMessage(), is("Failed to stop stream: fatal_error")); + assertThat(e.getStopResponse(), is(stopResponse)); + } + } + + @Test(expected = SlackChatStreamException.class) + public void append_throws_after_completed() throws Exception { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.stop(); + stream.append("nope"); + } + + @Test(expected = SlackChatStreamException.class) + public void stop_throws_after_completed() throws Exception { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.stop(); + stream.stop(); // second stop should throw + } + + @Test + public void stop_with_additional_markdown_text() throws Exception { + MethodsClient client = mock(MethodsClient.class); + ChatStartStreamResponse startResponse = new ChatStartStreamResponse(); + startResponse.setOk(true); + startResponse.setTs("0000000000.000000"); + when(client.chatStartStream(any(ChatStartStreamRequest.class))).thenReturn(startResponse); + + ChatStopStreamResponse stopResponse = new ChatStopStreamResponse(); + stopResponse.setOk(true); + when(client.chatStopStream(any(ChatStopStreamRequest.class))).thenReturn(stopResponse); + + ChatStream stream = ChatStream.builder() + .client(client) + .channel("C123") + .build(); + + stream.append("hello "); + ChatStopStreamResponse stop = stream.stop("world!"); + assertThat(stop.isOk(), is(true)); + verify(client).chatStopStream(argThat((ChatStopStreamRequest req) -> "hello world!".equals(req.getMarkdownText()))); + } + + @Test + public void stop_with_blocks_and_metadata() throws Exception { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + Message.Metadata metadata = new Message.Metadata(); + metadata.setEventType("test_event"); + metadata.setEventPayload(Collections.singletonMap("key", "value")); + + ChatStopStreamResponse stop = stream.stop( + "final text", + Collections.singletonList(section(s -> s.text(plainText("Block text")))), + metadata + ); + assertThat(stop.isOk(), is(true)); + } + + @Test + public void stop_after_stream_already_started() throws Exception { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1)); // force immediate flush + + // Start the stream via append + ChatAppendStreamResponse appendResp = stream.append("a"); + assertThat(appendResp.isOk(), is(true)); + assertThat(stream.getStreamTs(), is(notNullValue())); + + // Now stop - should not call startStream again + ChatStopStreamResponse stop = stream.stop(); + assertThat(stop.isOk(), is(true)); + } + + @Test + public void default_buffer_size_is_256() { + ChatStream stream = slack.methods(ValidToken).chatStream(req -> req + .channel("C123")); + + assertThat(stream.getBufferSize(), is(256)); + } + + @Test(expected = IllegalArgumentException.class) + public void rejects_zero_buffer_size() { + slack.methods(ValidToken).chatStream(req -> req + .channel("C123") + .bufferSize(0)); + } + + @Test + public void multiple_appends_accumulate_in_buffer() throws Exception { + MethodsClient client = mock(MethodsClient.class); + ChatStartStreamResponse startResponse = new ChatStartStreamResponse(); + startResponse.setOk(true); + startResponse.setTs("0000000000.000000"); + when(client.chatStartStream(any(ChatStartStreamRequest.class))).thenReturn(startResponse); + + ChatStopStreamResponse stopResponse = new ChatStopStreamResponse(); + stopResponse.setOk(true); + when(client.chatStopStream(any(ChatStopStreamRequest.class))).thenReturn(stopResponse); + + ChatStream stream = ChatStream.builder() + .client(client) + .channel("C123") + .bufferSize(256) + .build(); + + stream.append("hello"); + stream.append(" "); + stream.append("world"); + + stream.stop(); + verify(client).chatStopStream(argThat((ChatStopStreamRequest req) -> "hello world".equals(req.getMarkdownText()))); + } +}