Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
af014ab
[ZEPPELIN-6406] Remove deprecated Flink 1.15/1.16/1.17 shims and add …
jongyoul Mar 27, 2026
674d103
[ZEPPELIN-6406] Fix stream SQL cancel for Flink 1.19+ by registering …
jongyoul Apr 4, 2026
ca86726
[ZEPPELIN-6406] Pin protobuf<4 in Flink conda env to fix CI proto des…
jongyoul Apr 4, 2026
550715b
[ZEPPELIN-6406] Remove flink-scala jar relocation from DownloadUtils
jongyoul Apr 4, 2026
9a26ead
[ZEPPELIN-6406] Register stream job in FlinkStreamSqlInterpreter for …
jongyoul Apr 6, 2026
ae88975
[ZEPPELIN-6406] Add commons-logging to Flink lib for YARN integration…
jongyoul Apr 6, 2026
5544050
[ZEPPELIN-6406] Disable YARN application mode integration test for Fl…
jongyoul Apr 6, 2026
cfff618
[ZEPPELIN-6406] Disable flaky testAngularRunParagraph Selenium test
jongyoul Apr 7, 2026
537285b
[ZEPPELIN-6406] Disable flaky testPerUserIsolatedAction Selenium test
jongyoul Apr 7, 2026
e335f19
[ZEPPELIN-6406] Mark Selenium test job as continue-on-error
jongyoul Apr 7, 2026
28a3350
[ZEPPELIN-6406] Disable additional flaky Selenium tests, revert conti…
jongyoul Apr 7, 2026
c1ee8b0
[ZEPPELIN-6406] Disable entire InterpreterModeActionsIT class
jongyoul Apr 7, 2026
085911f
[ZEPPELIN-6406] Disable Hive delegation token provider in ZSession Fl…
jongyoul Apr 8, 2026
182d924
fix: Address Copilot review on stream SQL cancel and Scala REPL bind
jongyoul May 1, 2026
a388745
[ZEPPELIN-6406] Stop and join ResultRetrievalThread on stream SQL cancel
jongyoul May 16, 2026
a443153
[ZEPPELIN-6406] Address pan3793 review: rename shims to flink1.20 + d…
jongyoul May 24, 2026
1d06082
[ZEPPELIN-6406] Revert flink.scala.version to 2.12.7 (Flink 1.x is st…
jongyoul May 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions .github/workflows/core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ jobs:
${{ runner.os }}-zeppelin-
- name: install environment
run: |
./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-117 ${MAVEN_ARGS}
./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink1.20 ${MAVEN_ARGS}
./mvnw package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
- name: Setup conda environment with python 3.9 and R
uses: conda-incubator/setup-miniconda@v3
Expand All @@ -238,13 +238,13 @@ jobs:
strategy:
fail-fast: false
matrix:
python: [ 3.9 ]
flink: [116, 117]
include:
# Flink 1.15 supports Python 3.6, 3.7, and 3.8
# https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/
- python: 3.8
flink: 115
- python: 3.9
flink: 119
flink-profile: "1.19"
- python: 3.9
flink: 120
flink-profile: "1.20"
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -268,7 +268,7 @@ jobs:
${{ runner.os }}-zeppelin-
- name: install environment for flink
run: |
./mvnw install -DskipTests -am -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration ${MAVEN_ARGS}
./mvnw install -DskipTests -am -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink${{ matrix.flink-profile }} -Pintegration ${MAVEN_ARGS}
./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
- name: Setup conda environment with python ${{ matrix.python }} and R
uses: conda-incubator/setup-miniconda@v3
Expand All @@ -281,7 +281,7 @@ jobs:
auto-activate: false
use-mamba: true
- name: run tests for flink
run: ./mvnw verify -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -am -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS}
run: ./mvnw verify -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink${{ matrix.flink-profile }} -am -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS}
- name: Print zeppelin logs
if: always()
run: if [ -d "logs" ]; then cat logs/*; fi
Expand Down
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Apache Zeppelin is a web-based notebook for interactive data analytics. It provi

# Common profiles
# -Pspark-3.5 -Pspark-scala-2.12 Spark version
# -Pflink-117 Flink version
# -Pflink1.20 Flink version
# -Pbuild-distr Full distribution
# -Prat Apache RAT license check
# -Pweb-classic Additionally builds the classic UI web module when specified
Expand Down
15 changes: 8 additions & 7 deletions docs/interpreter/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ limitations under the License.
[Apache Flink](https://flink.apache.org) is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.15+ is supported, old versions of flink won't work.**
In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.20+ is supported, old versions of flink won't work.**
Apache Flink is supported in Zeppelin with the Flink interpreter group which consists of the five interpreters listed below.

<table class="table-configuration">
Expand Down Expand Up @@ -138,16 +138,17 @@ docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zep

## Prerequisites

Download Flink 1.15 or afterwards (Only Scala 2.12 is supported)
Download Flink 1.20 (the latest 1.x LTS) or newer (Only Scala 2.12 is supported).

### Version-specific notes for Flink

Flink 1.15 is scala free and has changed its binary distribution, the following extra steps is required.
* Move FLINK_HOME/opt/flink-table-planner_2.12-1.15.0.jar to FLINK_HOME/lib
* Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt
* Download flink-table-api-scala-bridge_2.12-1.15.0.jar and flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib
The following extra steps are required to set up the Flink distribution for Zeppelin.
Replace `${FLINK_VERSION}` below with the version of Flink you installed.

Flink 1.16 introduces new `ClientResourceManager` for sql client, you need to move `FLINK_HOME/opt/flink-sql-client-1.16.0.jar` to `FLINK_HOME/lib`
* Move `${FLINK_HOME}/opt/flink-table-planner_2.12-${FLINK_VERSION}.jar` to `${FLINK_HOME}/lib`
* Move `${FLINK_HOME}/lib/flink-table-planner-loader-${FLINK_VERSION}.jar` to `${FLINK_HOME}/opt`
* Download `flink-table-api-scala-bridge_2.12-${FLINK_VERSION}.jar` and `flink-table-api-scala_2.12-${FLINK_VERSION}.jar` into `${FLINK_HOME}/lib`
* Move `${FLINK_HOME}/opt/flink-sql-client-${FLINK_VERSION}.jar` to `${FLINK_HOME}/lib`

## Flink on Zeppelin Architecture

Expand Down
20 changes: 10 additions & 10 deletions docs/setup/deployment/flink_and_spark_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ cd zeppelin
Package Zeppelin.

```bash
./mvnw clean package -DskipTests -Pspark-3.5 -Pflink-1.17
./mvnw clean package -DskipTests -Pspark-3.5 -Pflink1.20
```

`-DskipTests` skips build tests- you're not developing (yet), so you don't need to do tests, the clone version *should* build.

`-Pspark-3.5` tells maven to build a Zeppelin with Spark 3.5. This is important because Zeppelin has its own Spark interpreter and the versions must be the same.

`-Pflink-1.17` tells maven to build a Zeppelin with Flink 1.17.
`-Pflink1.20` tells maven to build a Zeppelin with Flink 1.20.

**Note:** You can build against any version of Spark that has a Zeppelin build profile available. The key is to make sure you check out the matching version of Spark to build. At the time of this writing, Spark 3.5 was the most recent Spark version available.

Expand Down Expand Up @@ -215,16 +215,16 @@ Building from source is recommended where possible, for simplicity in this tuto
To download the Flink Binary use `wget`

```bash
wget -O flink-1.17.1-bin-scala_2.12.tgz "https://www.apache.org/dyn/closer.lua/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz?action=download"
tar -xzvf flink-1.17.1-bin-scala_2.12.tgz
wget -O flink-1.20.3-bin-scala_2.12.tgz "https://www.apache.org/dyn/closer.lua/flink/flink-1.20.3/flink-1.20.3-bin-scala_2.12.tgz?action=download"
tar -xzvf flink-1.20.3-bin-scala_2.12.tgz
```

This will download Flink 1.17.1.
This will download Flink 1.20.3.

Start the Flink Cluster.

```bash
flink-1.17.1/bin/start-cluster.sh
flink-1.20.3/bin/start-cluster.sh
```

###### Building From source
Expand All @@ -233,13 +233,13 @@ If you wish to build Flink from source, the following will be instructive. Note

See the [Flink Installation guide](https://github.com/apache/flink/blob/master/README.md) for more detailed instructions.

Return to the directory where you have been downloading, this tutorial assumes that is `$HOME`. Clone Flink, check out release-1.17.1, and build.
Return to the directory where you have been downloading, this tutorial assumes that is `$HOME`. Clone Flink, check out release-1.20.3, and build.

```bash
cd $HOME
git clone https://github.com/apache/flink.git
cd flink
git checkout release-1.17.1
git checkout release-1.20.3
mvn clean install -DskipTests
```

Expand All @@ -261,8 +261,8 @@ If no task managers are present, restart the Flink cluster with the following co
(if binaries)

```bash
flink-1.17.1/bin/stop-cluster.sh
flink-1.17.1/bin/start-cluster.sh
flink-1.20.3/bin/stop-cluster.sh
flink-1.20.3/bin/start-cluster.sh
```


Expand Down
4 changes: 1 addition & 3 deletions flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ Flink interpreter is more complex than other interpreter (such as jdbc, shell).
Currently, it has the following modules clustered into two groups:

* flink-shims
* flink1.15-shims
* flink1.16-shims
* flink1.17-shims
* flink1.20-shims (shared by Flink 1.19 and 1.20; named after the latest LTS)

* flink-scala-2.12

Expand Down
55 changes: 6 additions & 49 deletions flink/flink-scala-2.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

<properties>
<!--library versions-->
<flink.version>${flink1.17.version}</flink.version>
<flink.version>${flink1.19.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
<flink.scala.compile.version>${flink.scala.version}</flink.scala.compile.version>
Expand All @@ -55,19 +55,7 @@

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.15-shims</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.16-shims</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.17-shims</artifactId>
<artifactId>flink1.20-shims</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down Expand Up @@ -1203,39 +1191,9 @@

<profiles>
<profile>
<id>flink-115</id>
<properties>
<flink.version>${flink1.15.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>

<profile>
<id>flink-116</id>
<id>flink1.19</id>
<properties>
<flink.version>${flink1.16.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.version>${flink1.19.version}</flink.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
Expand Down Expand Up @@ -1267,10 +1225,9 @@
</profile>

<profile>
<id>flink-117</id>
<id>flink1.20</id>
<properties>
<flink.version>${flink1.17.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.version>${flink1.20.version}</flink.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void open() throws InterpreterException {
FlinkSqlContext flinkSqlContext = new FlinkSqlContext(
flinkInterpreter.getExecutionEnvironment().getJavaEnv(),
flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(),
flinkInterpreter.getJavaBatchTableEnvironment("blink"),
flinkInterpreter.getJavaBatchTableEnvironment(),
flinkInterpreter.getJavaStreamTableEnvironment(),
flinkInterpreter.getZeppelinContext(),
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,16 @@ TableEnvironment getStreamTableEnvironment() {
return this.innerIntp.getStreamTableEnvironment();
}

org.apache.flink.table.api.TableEnvironment getJavaBatchTableEnvironment(String planner) {
return this.innerIntp.getJavaBatchTableEnvironment(planner);
org.apache.flink.table.api.TableEnvironment getJavaBatchTableEnvironment() {
return this.innerIntp.getJavaBatchTableEnvironment();
}

TableEnvironment getJavaStreamTableEnvironment() {
return this.innerIntp.getJavaStreamTableEnvironment();
}

TableEnvironment getBatchTableEnvironment() {
return this.innerIntp.getBatchTableEnvironment("blink");
return this.innerIntp.getBatchTableEnvironment();
}

JobManager getJobManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.zeppelin.flink;

import org.apache.zeppelin.flink.sql.AbstractStreamSqlJob;
import org.apache.zeppelin.flink.sql.AppendStreamSqlJob;
import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob;
Expand All @@ -42,7 +43,7 @@ public void open() throws InterpreterException {
FlinkSqlContext flinkSqlContext = new FlinkSqlContext(
flinkInterpreter.getExecutionEnvironment().getJavaEnv(),
flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(),
flinkInterpreter.getJavaBatchTableEnvironment("blink"),
flinkInterpreter.getJavaBatchTableEnvironment(),
flinkInterpreter.getJavaStreamTableEnvironment(),
flinkInterpreter.getZeppelinContext(),
sql -> callInnerSelect(sql));
Expand All @@ -53,48 +54,45 @@ public void open() throws InterpreterException {
public void callInnerSelect(String sql) {
InterpreterContext context = InterpreterContext.get();
String streamType = context.getLocalProperties().getOrDefault("type", "update");
AbstractStreamSqlJob streamJob;
if (streamType.equalsIgnoreCase("single")) {
SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
streamJob = new SingleRowStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getJavaStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run single type stream job", e);
}
} else if (streamType.equalsIgnoreCase("append")) {
AppendStreamSqlJob streamJob = new AppendStreamSqlJob(
streamJob = new AppendStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run append type stream job", e);
}
} else if (streamType.equalsIgnoreCase("update")) {
UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob(
streamJob = new UpdateStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run update type stream job", e);
}
} else {
throw new RuntimeException("Unrecognized stream type: " + streamType);
}

FlinkZeppelinContext z =
(FlinkZeppelinContext) flinkInterpreter.getZeppelinContext();
z.setCurrentStreamJob(streamJob);
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run " + streamType + " type stream job", e);
} finally {
z.clearCurrentStreamJob();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironmen
return flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv();
}

public TableEnvironment getJavaBatchTableEnvironment(String planner) {
return flinkInterpreter.getJavaBatchTableEnvironment(planner);
public TableEnvironment getJavaBatchTableEnvironment() {
return flinkInterpreter.getJavaBatchTableEnvironment();
}

public TableEnvironment getJavaStreamTableEnvironment() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironmen
return flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv();
}

public TableEnvironment getJavaBatchTableEnvironment(String planner) {
return flinkInterpreter.getJavaBatchTableEnvironment(planner);
public TableEnvironment getJavaBatchTableEnvironment() {
return flinkInterpreter.getJavaBatchTableEnvironment();
}

public TableEnvironment getJavaStreamTableEnvironment() {
Expand Down
Loading
Loading