diff --git a/src/clusterfuzz/_internal/system/archive.py b/src/clusterfuzz/_internal/system/archive.py index 79c7195738..e6cfca0416 100644 --- a/src/clusterfuzz/_internal/system/archive.py +++ b/src/clusterfuzz/_internal/system/archive.py @@ -298,6 +298,11 @@ def extract(self, if (_is_attempting_path_traversal(self._archive_path, output_directory, member)): return None + member_info = self._archive.getmember(member) + if member_info.issym() or member_info.islnk(): + logs.warning('Link member %s found while unpacking archive %s. ' + 'Aborting.' % (member, self._archive_path)) + return None self._archive.extract(member=member, path=output_directory) return os.path.realpath(os.path.join(output_directory, member)) diff --git a/src/clusterfuzz/_internal/tests/core/system/archive_test.py b/src/clusterfuzz/_internal/tests/core/system/archive_test.py index 1f9a5432ea..9a973f7da5 100644 --- a/src/clusterfuzz/_internal/tests/core/system/archive_test.py +++ b/src/clusterfuzz/_internal/tests/core/system/archive_test.py @@ -76,6 +76,71 @@ def test_unpack_absolute_path_traversal(self): shell.remove_directory(output_directory) + def test_unpack_untrusted_tar_symlink_fails(self): + """Test that unpacking an untrusted TAR symlink fails.""" + with tempfile.NamedTemporaryFile(suffix='.tar') as tmp_tar_file: + archive_path = tmp_tar_file.name + + with tarfile.open(archive_path, 'w') as tar: + tarinfo = tarfile.TarInfo(name='linked_file') + tarinfo.type = tarfile.SYMTYPE + tarinfo.linkname = '/tmp/linked_file_target' + tar.addfile(tarinfo) + + output_directory = tempfile.mkdtemp() + + with archive.open(archive_path) as reader: + result = reader.extract_all(output_directory, trusted=False) + self.assertFalse(result) + + self.assertFalse(os.path.lexists(os.path.join(output_directory, + 'linked_file'))) + shell.remove_directory(output_directory) + + def test_unpack_untrusted_tar_hardlink_fails(self): + """Test that unpacking an untrusted TAR hardlink fails.""" + with tempfile.NamedTemporaryFile(suffix='.tar') as tmp_tar_file: + archive_path = tmp_tar_file.name + + with tarfile.open(archive_path, 'w') as tar: + tarinfo = tarfile.TarInfo(name='linked_file') + tarinfo.type = tarfile.LNKTYPE + tarinfo.linkname = '/tmp/linked_file_target' + tar.addfile(tarinfo) + + output_directory = tempfile.mkdtemp() + + with archive.open(archive_path) as reader: + result = reader.extract_all(output_directory, trusted=False) + self.assertFalse(result) + + self.assertFalse(os.path.lexists(os.path.join(output_directory, + 'linked_file'))) + shell.remove_directory(output_directory) + + def test_unpack_untrusted_tar_regular_file_succeeds(self): + """Test that unpacking an untrusted TAR regular file still succeeds.""" + with tempfile.NamedTemporaryFile(suffix='.tar') as tmp_tar_file: + archive_path = tmp_tar_file.name + + with tarfile.open(archive_path, 'w') as tar: + file_data = b'plain content' + tarinfo = tarfile.TarInfo(name='plain_file') + tarinfo.size = len(file_data) + tar.addfile(tarinfo, io.BytesIO(file_data)) + + output_directory = tempfile.mkdtemp() + + with archive.open(archive_path) as reader: + result = reader.extract_all(output_directory, trusted=False) + self.assertTrue(result) + + output_path = os.path.join(output_directory, 'plain_file') + self.assertTrue(os.path.isfile(output_path)) + with open(output_path, 'rb') as output_file: + self.assertEqual(output_file.read(), b'plain content') + shell.remove_directory(output_directory) + class ArchiveReaderTest(unittest.TestCase): """Tests for the archive.iterator function."""