Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/clusterfuzz/_internal/system/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ def extract(self,
if (_is_attempting_path_traversal(self._archive_path, output_directory,
member)):
return None
member_info = self._archive.getmember(member)
if member_info.issym() or member_info.islnk():
logs.warning('Link member %s found while unpacking archive %s. '
'Aborting.' % (member, self._archive_path))
return None

self._archive.extract(member=member, path=output_directory)
return os.path.realpath(os.path.join(output_directory, member))
Expand Down
65 changes: 65 additions & 0 deletions src/clusterfuzz/_internal/tests/core/system/archive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,71 @@ def test_unpack_absolute_path_traversal(self):

shell.remove_directory(output_directory)

def test_unpack_untrusted_tar_symlink_fails(self):
"""Test that unpacking an untrusted TAR symlink fails."""
with tempfile.NamedTemporaryFile(suffix='.tar') as tmp_tar_file:
archive_path = tmp_tar_file.name

with tarfile.open(archive_path, 'w') as tar:
tarinfo = tarfile.TarInfo(name='linked_file')
tarinfo.type = tarfile.SYMTYPE
tarinfo.linkname = '/tmp/linked_file_target'
tar.addfile(tarinfo)

output_directory = tempfile.mkdtemp()

with archive.open(archive_path) as reader:
result = reader.extract_all(output_directory, trusted=False)
self.assertFalse(result)

self.assertFalse(os.path.lexists(os.path.join(output_directory,
'linked_file')))
shell.remove_directory(output_directory)

def test_unpack_untrusted_tar_hardlink_fails(self):
"""Test that unpacking an untrusted TAR hardlink fails."""
with tempfile.NamedTemporaryFile(suffix='.tar') as tmp_tar_file:
archive_path = tmp_tar_file.name

with tarfile.open(archive_path, 'w') as tar:
tarinfo = tarfile.TarInfo(name='linked_file')
tarinfo.type = tarfile.LNKTYPE
tarinfo.linkname = '/tmp/linked_file_target'
tar.addfile(tarinfo)

output_directory = tempfile.mkdtemp()

with archive.open(archive_path) as reader:
result = reader.extract_all(output_directory, trusted=False)
self.assertFalse(result)

self.assertFalse(os.path.lexists(os.path.join(output_directory,
'linked_file')))
shell.remove_directory(output_directory)

def test_unpack_untrusted_tar_regular_file_succeeds(self):
"""Test that unpacking an untrusted TAR regular file still succeeds."""
with tempfile.NamedTemporaryFile(suffix='.tar') as tmp_tar_file:
archive_path = tmp_tar_file.name

with tarfile.open(archive_path, 'w') as tar:
file_data = b'plain content'
tarinfo = tarfile.TarInfo(name='plain_file')
tarinfo.size = len(file_data)
tar.addfile(tarinfo, io.BytesIO(file_data))

output_directory = tempfile.mkdtemp()

with archive.open(archive_path) as reader:
result = reader.extract_all(output_directory, trusted=False)
self.assertTrue(result)

output_path = os.path.join(output_directory, 'plain_file')
self.assertTrue(os.path.isfile(output_path))
with open(output_path, 'rb') as output_file:
self.assertEqual(output_file.read(), b'plain content')
shell.remove_directory(output_directory)


class ArchiveReaderTest(unittest.TestCase):
"""Tests for the archive.iterator function."""
Expand Down