Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b798ae0
[flink-action][server][client] add orphan files cleanup action for re…
platinumhamburg May 29, 2026
b25eea8
[flink][client] refine orphan files cleanup
platinumhamburg May 29, 2026
15de147
[flink][client][server] address PR #3404 review feedback
platinumhamburg Jun 2, 2026
407bd57
[flink][test] fix Flink 2.x open() compatibility and flaky snapshot t…
platinumhamburg Jun 2, 2026
7a26c4e
[server][flink][test] polish orphan cleanup and broaden active snapsh…
platinumhamburg Jun 11, 2026
7565d27
[flink] Polish orphan cleanup action
platinumhamburg Jun 15, 2026
744085e
[server][flink][common] move RemoteLogManifest to fluss-common
platinumhamburg Jun 15, 2026
937e54a
[flink][test] make OrphanFilesCleanITCase extend AbstractTestBase
platinumhamburg Jun 15, 2026
e6efd17
[flink] fail-fast on incompatible Fluss server in orphan cleanup
platinumhamburg Jun 15, 2026
26154d3
[flink] emit orphan cleanup summary through AuditLogger
platinumhamburg Jun 15, 2026
6c0a56c
[flink] drop duplicate LOG.info in StatsAggregateOperator
platinumhamburg Jun 15, 2026
c7fd80d
[flink] Clean empty dirs during scan
platinumhamburg Jun 16, 2026
e1a1b31
[flink] Audit dot orphan files
platinumhamburg Jun 16, 2026
26bb178
[flink] Throttle remote fs cleanup ops
platinumhamburg Jun 16, 2026
577c159
[flink] Use versioned action jars
platinumhamburg Jun 16, 2026
879e6cc
[flink][test] make orphan segment dir old to fix empty-dir sweep asse…
platinumhamburg Jun 16, 2026
bb803fd
[flink] Improve Javadoc wording in OrphanFilesCleanAction
platinumhamburg Jun 18, 2026
b255038
[build] Broaden JaCoCo exclusion for orphan action package
platinumhamburg Jun 22, 2026
fb43e3b
Pass client.* extra configs to Fluss connection for auth support
platinumhamburg Jun 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions fluss-common/src/main/java/org/apache/fluss/fs/FileStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,18 @@ public interface FileStatus {
* @return the corresponding Path to the FileStatus
*/
FsPath getPath();

/**
* Returns the modification time of the file in milliseconds since the epoch.
*
* <p>The default implementation returns {@link Long#MAX_VALUE}, which is interpreted by
* time-based filters (e.g. orphan-files cleanup) as "always fresh" - effectively a fail-closed
* default that prevents deletion when modification time is unavailable. File system
* implementations that can expose modification time SHOULD override this.
*
* @return the modification time in epoch millis, or {@link Long#MAX_VALUE} when unavailable
*/
default long getModificationTime() {
return Long.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public FsPath getPath() {
return this.path;
}

@Override
public long getModificationTime() {
return this.file.lastModified();
}

public File getFile() {
return this.file;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
* limitations under the License.
*/

package org.apache.fluss.server.log.remote;
package org.apache.fluss.remote;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.remote.RemoteLogSegment;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -33,7 +31,7 @@

/**
* A remote log manifest is an immutable list of current {@link RemoteLogSegment} which can
* represent a snapshot of {@link RemoteLogTablet}.
* represent a snapshot of a remote log tablet.
*/
public class RemoteLogManifest {
private final PhysicalTablePath physicalTablePath;
Expand Down Expand Up @@ -122,7 +120,6 @@ public TableBucket getTableBucket() {
return tableBucket;
}

@VisibleForTesting
public List<RemoteLogSegment> getRemoteLogSegmentList() {
return remoteLogSegmentList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
* limitations under the License.
*/

package org.apache.fluss.server.log.remote;
package org.apache.fluss.remote;

import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.remote.RemoteLogSegment;
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.fluss.utils.json.JsonDeserializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class FlussPaths {
public static final String REMOTE_LOG_DIR_NAME = "log";

/** The directory name for storing metadata files (e.g., manifest) for a log tablet. */
private static final String REMOTE_LOG_METADATA_DIR_NAME = "metadata";
public static final String REMOTE_LOG_METADATA_DIR_NAME = "metadata";

/** Suffix of a manifest file. */
private static final String REMOTE_LOG_MANIFEST_FILE_SUFFIX = ".manifest";
Expand Down
54 changes: 54 additions & 0 deletions fluss-common/src/test/java/org/apache/fluss/fs/FileStatusTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.fs;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for default methods of {@link FileStatus}. */
class FileStatusTest {

/**
* An implementation that does not override {@link FileStatus#getModificationTime()} must
* inherit the fail-safe default of {@link Long#MAX_VALUE}, so time-based filters treat the file
* as "always fresh" and never delete it when modification time is unavailable.
*/
@Test
void defaultModificationTimeIsMaxValueFailSafe() {
FileStatus status =
new FileStatus() {
@Override
public long getLen() {
return 0L;
}

@Override
public boolean isDir() {
return false;
}

@Override
public FsPath getPath() {
return new FsPath("/tmp/x");
}
};

assertThat(status.getModificationTime()).isEqualTo(Long.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@
* limitations under the License.
*/

package org.apache.fluss.server.log.remote;
package org.apache.fluss.remote;

import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.remote.RemoteLogSegment;
import org.apache.fluss.utils.json.JsonSerdeTestBase;

import java.util.Arrays;
import java.util.UUID;

/** Tests of {@link org.apache.fluss.server.log.remote.RemoteLogManifestJsonSerde}. */
/** Tests of {@link RemoteLogManifestJsonSerde}. */
class RemoteLogManifestJsonSerdeTest extends JsonSerdeTestBase<RemoteLogManifest> {
private static final PhysicalTablePath TABLE_PATH1 =
PhysicalTablePath.of(TablePath.of("db", "mytable"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public boolean isDir() {
return fileStatus.isDirectory();
}

@Override
public long getModificationTime() {
return fileStatus.getModificationTime();
}

// ------------------------------------------------------------------------

/**
Expand Down
10 changes: 9 additions & 1 deletion fluss-flink/fluss-flink-1.18/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,19 @@
<include>org.apache.fluss:fluss-client</include>
</includes>
</artifactSet>
<filters combine.children="append">
<filter>
<artifact>org.apache.fluss:fluss-flink-common</artifact>
<excludes>
<exclude>org/apache/fluss/flink/action/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
</project>
7 changes: 6 additions & 1 deletion fluss-flink/fluss-flink-1.19/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,16 @@
<include>org.apache.fluss:fluss-client</include>
</includes>
</artifactSet>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.fluss.flink.action.FlussActionEntrypoint</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.fluss.flink.action.orphan.OrphanFilesCleanActionFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.action.orphan;

/** The IT case for orphan files cleanup in Flink 1.19. */
class Flink119OrphanFilesCleanITCase extends OrphanFilesCleanITCase {}
7 changes: 6 additions & 1 deletion fluss-flink/fluss-flink-1.20/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,16 @@
<include>org.apache.fluss:fluss-client</include>
</includes>
</artifactSet>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.fluss.flink.action.FlussActionEntrypoint</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.fluss.flink.action.orphan.OrphanFilesCleanActionFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.action.orphan;

/** The IT case for orphan files cleanup in Flink 1.20. */
class Flink120OrphanFilesCleanITCase extends OrphanFilesCleanITCase {}
5 changes: 5 additions & 0 deletions fluss-flink/fluss-flink-2.2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@
<include>org.apache.fluss:fluss-client</include>
</includes>
</artifactSet>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.fluss.flink.action.FlussActionEntrypoint</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.flink.util.MultipleParameterTool;

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Map;

/**
Expand All @@ -43,4 +46,23 @@ public static MultipleParameterToolAdapter fromArgs(String[] args) {
public Map<String, String> toMap() {
return this.multipleParameterTool.toMap();
}

/** Returns whether the given key is present in the parsed arguments. */
public boolean has(String key) {
return this.multipleParameterTool.has(key);
}

/** Returns the value for the given key, or {@code null} if the key is not found. */
@Nullable
public String get(String key) {
return this.multipleParameterTool.get(key);
}

/**
* Returns all values associated with the given key, or {@code null} if the key is not found.
*/
@Nullable
public Collection<String> getMultiParameter(String key) {
return this.multipleParameterTool.getMultiParameter(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.fluss.flink.action.orphan.OrphanFilesCleanActionFactory
Loading
Loading