Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions web/pgadmin/browser/server_groups/servers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
from .... import socketio as sio
from pgadmin.utils import get_complete_file_path
from pgadmin.settings.utils import with_object_filters
from pgadmin.utils.server_access import get_server, \
get_user_server_query, get_server_group
from pgadmin.utils.server_access import get_server, get_server_group, \
get_visible_server_query


# File-path keys in connection_params that are per-user and must
Expand Down Expand Up @@ -290,7 +290,7 @@ def get_nodes(self, gid, object_filters):
"""Return a JSON document listing the server groups for the user"""

hide_shared_server = get_preferences()
servers = get_user_server_query().filter(
servers = get_visible_server_query().filter(
Server.servergroup_id == gid, Server.is_adhoc == 0)

driver = get_driver(PG_DEFAULT_DRIVER)
Expand Down Expand Up @@ -646,7 +646,7 @@ def nodes(self, gid):
Return a JSON document listing the servers under this server group
for the user.
"""
servers = get_user_server_query().filter(
servers = get_visible_server_query().filter(
Server.servergroup_id == gid, Server.is_adhoc == 0)

driver = get_driver(PG_DEFAULT_DRIVER)
Expand Down Expand Up @@ -1062,7 +1062,7 @@ def list(self, gid, object_filters):
"""
Return list of attributes of all servers.
"""
servers = get_user_server_query().filter(
servers = get_visible_server_query().filter(
Server.servergroup_id == gid,
Server.is_adhoc == 0).order_by(Server.name)
sg = get_server_group(gid)
Expand Down
4 changes: 2 additions & 2 deletions web/pgadmin/tools/sqleditor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
ERROR_MSG_FAIL_TO_PROMOTE_QT
from pgadmin.model import Server, ServerGroup
from pgadmin.utils.server_access import get_server, \
get_server_groups_for_user, get_user_server_query
get_server_groups_for_user, get_visible_server_query
from pgadmin.tools.schema_diff.node_registry import SchemaDiffRegistry
from pgadmin.settings import get_setting
from pgadmin.utils.preferences import Preferences
Expand Down Expand Up @@ -2413,7 +2413,7 @@ def get_new_connection_data(sgid=None, sid=None):
server_groups = get_server_groups_for_user()
server_group_data = {server_group.name: [] for server_group in
server_groups}
servers = get_user_server_query().filter(
servers = get_visible_server_query().filter(
Server.is_adhoc == 0)

for server in servers:
Expand Down
19 changes: 15 additions & 4 deletions web/pgadmin/utils/server_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,14 @@ def get_server_groups_for_user():

Includes groups owned by the user plus groups containing shared
servers (Server.shared=True, visible to all authenticated users).
Administrators see all groups.
Administrators follow the same visibility rules in browser listings
so private server groups owned by other users are not exposed.
"""
if not config.SERVER_MODE:
return ServerGroup.query.filter_by(
user_id=current_user.id
).all()

if _is_admin():
return ServerGroup.query.all()

return ServerGroup.query.filter(
or_(
ServerGroup.user_id == current_user.id,
Expand All @@ -148,6 +146,19 @@ def get_user_server_query():
if _is_admin():
return Server.query

return get_visible_server_query()


def get_visible_server_query():
"""Return a query for servers visible in browser listings.

This intentionally does not grant Administrator users extra visibility:
browser trees and picker dialogs should only expose servers owned by the
current user or explicitly shared with all users.
"""
if not config.SERVER_MODE:
return Server.query

return Server.query.filter(
or_(
Server.user_id == current_user.id,
Expand Down
97 changes: 97 additions & 0 deletions web/pgadmin/utils/tests/test_server_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################

from unittest.mock import MagicMock, patch

from pgadmin.utils.route import BaseTestGenerator
from pgadmin.utils.server_access import get_server_groups_for_user, \
get_user_server_query, get_visible_server_query


MODULE = 'pgadmin.utils.server_access'


class ServerGroupAdminVisibilityTestCase(BaseTestGenerator):
"""Validate server group visibility for admin users."""

scenarios = [
('Admin browser listing excludes private server groups owned by '
'other users', dict())
]

@patch(MODULE + '.or_', return_value='visibility_filter')
@patch(MODULE + '.config')
@patch(MODULE + '.db')
@patch(MODULE + '.Server')
@patch(MODULE + '.ServerGroup')
@patch(MODULE + '.current_user')
def runTest(self, mock_current_user, mock_server_group, mock_server,
mock_db, mock_config, _):
mock_config.SERVER_MODE = True
mock_current_user.id = 10
mock_current_user.has_role.return_value = True

visible_groups = [MagicMock(name='visible_group')]
mock_server_group.query.filter.return_value.all.return_value = \
visible_groups

result = get_server_groups_for_user()

self.assertEqual(result, visible_groups)
mock_server_group.query.all.assert_not_called()
mock_server_group.query.filter.assert_called_once_with(
'visibility_filter'
)


class VisibleServerQueryAdminVisibilityTestCase(BaseTestGenerator):
"""Validate browser-visible server query construction for admin users."""

scenarios = [
('Admin visible server query excludes private servers owned by other '
'users', dict())
]

@patch(MODULE + '.or_', return_value='visibility_filter')
@patch(MODULE + '.config')
@patch(MODULE + '.Server')
@patch(MODULE + '.current_user')
def runTest(self, mock_current_user, mock_server, mock_config, _):
mock_config.SERVER_MODE = True
mock_current_user.id = 10
mock_current_user.has_role.return_value = True

scoped_query = MagicMock(name='scoped_query')
mock_server.query.filter.return_value = scoped_query

result = get_visible_server_query()

self.assertEqual(result, scoped_query)
mock_server.query.filter.assert_called_once_with('visibility_filter')


class UserServerQueryAdminAccessTestCase(BaseTestGenerator):
"""Validate generic server query construction for admin users."""

scenarios = [
('Admin generic server query still includes directly accessible '
'private servers', dict())
]

@patch(MODULE + '.config')
@patch(MODULE + '.Server')
@patch(MODULE + '.current_user')
def runTest(self, mock_current_user, mock_server, mock_config):
mock_config.SERVER_MODE = True
mock_current_user.has_role.return_value = True

result = get_user_server_query()

self.assertEqual(result, mock_server.query)
mock_server.query.filter.assert_not_called()