diff --git a/web/pgadmin/browser/server_groups/servers/__init__.py b/web/pgadmin/browser/server_groups/servers/__init__.py index 980e10827f4..7a91bf6572f 100644 --- a/web/pgadmin/browser/server_groups/servers/__init__.py +++ b/web/pgadmin/browser/server_groups/servers/__init__.py @@ -46,8 +46,8 @@ from .... import socketio as sio from pgadmin.utils import get_complete_file_path from pgadmin.settings.utils import with_object_filters -from pgadmin.utils.server_access import get_server, \ - get_user_server_query, get_server_group +from pgadmin.utils.server_access import get_server, get_server_group, \ + get_visible_server_query # File-path keys in connection_params that are per-user and must @@ -290,7 +290,7 @@ def get_nodes(self, gid, object_filters): """Return a JSON document listing the server groups for the user""" hide_shared_server = get_preferences() - servers = get_user_server_query().filter( + servers = get_visible_server_query().filter( Server.servergroup_id == gid, Server.is_adhoc == 0) driver = get_driver(PG_DEFAULT_DRIVER) @@ -646,7 +646,7 @@ def nodes(self, gid): Return a JSON document listing the servers under this server group for the user. """ - servers = get_user_server_query().filter( + servers = get_visible_server_query().filter( Server.servergroup_id == gid, Server.is_adhoc == 0) driver = get_driver(PG_DEFAULT_DRIVER) @@ -1062,7 +1062,7 @@ def list(self, gid, object_filters): """ Return list of attributes of all servers. """ - servers = get_user_server_query().filter( + servers = get_visible_server_query().filter( Server.servergroup_id == gid, Server.is_adhoc == 0).order_by(Server.name) sg = get_server_group(gid) diff --git a/web/pgadmin/tools/sqleditor/__init__.py b/web/pgadmin/tools/sqleditor/__init__.py index c9e26df2f00..ed706062b66 100644 --- a/web/pgadmin/tools/sqleditor/__init__.py +++ b/web/pgadmin/tools/sqleditor/__init__.py @@ -64,7 +64,7 @@ ERROR_MSG_FAIL_TO_PROMOTE_QT from pgadmin.model import Server, ServerGroup from pgadmin.utils.server_access import get_server, \ - get_server_groups_for_user, get_user_server_query + get_server_groups_for_user, get_visible_server_query from pgadmin.tools.schema_diff.node_registry import SchemaDiffRegistry from pgadmin.settings import get_setting from pgadmin.utils.preferences import Preferences @@ -2413,7 +2413,7 @@ def get_new_connection_data(sgid=None, sid=None): server_groups = get_server_groups_for_user() server_group_data = {server_group.name: [] for server_group in server_groups} - servers = get_user_server_query().filter( + servers = get_visible_server_query().filter( Server.is_adhoc == 0) for server in servers: diff --git a/web/pgadmin/utils/server_access.py b/web/pgadmin/utils/server_access.py index 1e0c8fe6ad8..fd35dc5adac 100644 --- a/web/pgadmin/utils/server_access.py +++ b/web/pgadmin/utils/server_access.py @@ -114,16 +114,14 @@ def get_server_groups_for_user(): Includes groups owned by the user plus groups containing shared servers (Server.shared=True, visible to all authenticated users). - Administrators see all groups. + Administrators follow the same visibility rules in browser listings + so private server groups owned by other users are not exposed. """ if not config.SERVER_MODE: return ServerGroup.query.filter_by( user_id=current_user.id ).all() - if _is_admin(): - return ServerGroup.query.all() - return ServerGroup.query.filter( or_( ServerGroup.user_id == current_user.id, @@ -148,6 +146,19 @@ def get_user_server_query(): if _is_admin(): return Server.query + return get_visible_server_query() + + +def get_visible_server_query(): + """Return a query for servers visible in browser listings. + + This intentionally does not grant Administrator users extra visibility: + browser trees and picker dialogs should only expose servers owned by the + current user or explicitly shared with all users. + """ + if not config.SERVER_MODE: + return Server.query + return Server.query.filter( or_( Server.user_id == current_user.id, diff --git a/web/pgadmin/utils/tests/test_server_access.py b/web/pgadmin/utils/tests/test_server_access.py new file mode 100644 index 00000000000..53ace7535e5 --- /dev/null +++ b/web/pgadmin/utils/tests/test_server_access.py @@ -0,0 +1,97 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2026, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +from unittest.mock import MagicMock, patch + +from pgadmin.utils.route import BaseTestGenerator +from pgadmin.utils.server_access import get_server_groups_for_user, \ + get_user_server_query, get_visible_server_query + + +MODULE = 'pgadmin.utils.server_access' + + +class ServerGroupAdminVisibilityTestCase(BaseTestGenerator): + """Validate server group visibility for admin users.""" + + scenarios = [ + ('Admin browser listing excludes private server groups owned by ' + 'other users', dict()) + ] + + @patch(MODULE + '.or_', return_value='visibility_filter') + @patch(MODULE + '.config') + @patch(MODULE + '.db') + @patch(MODULE + '.Server') + @patch(MODULE + '.ServerGroup') + @patch(MODULE + '.current_user') + def runTest(self, mock_current_user, mock_server_group, mock_server, + mock_db, mock_config, _): + mock_config.SERVER_MODE = True + mock_current_user.id = 10 + mock_current_user.has_role.return_value = True + + visible_groups = [MagicMock(name='visible_group')] + mock_server_group.query.filter.return_value.all.return_value = \ + visible_groups + + result = get_server_groups_for_user() + + self.assertEqual(result, visible_groups) + mock_server_group.query.all.assert_not_called() + mock_server_group.query.filter.assert_called_once_with( + 'visibility_filter' + ) + + +class VisibleServerQueryAdminVisibilityTestCase(BaseTestGenerator): + """Validate browser-visible server query construction for admin users.""" + + scenarios = [ + ('Admin visible server query excludes private servers owned by other ' + 'users', dict()) + ] + + @patch(MODULE + '.or_', return_value='visibility_filter') + @patch(MODULE + '.config') + @patch(MODULE + '.Server') + @patch(MODULE + '.current_user') + def runTest(self, mock_current_user, mock_server, mock_config, _): + mock_config.SERVER_MODE = True + mock_current_user.id = 10 + mock_current_user.has_role.return_value = True + + scoped_query = MagicMock(name='scoped_query') + mock_server.query.filter.return_value = scoped_query + + result = get_visible_server_query() + + self.assertEqual(result, scoped_query) + mock_server.query.filter.assert_called_once_with('visibility_filter') + + +class UserServerQueryAdminAccessTestCase(BaseTestGenerator): + """Validate generic server query construction for admin users.""" + + scenarios = [ + ('Admin generic server query still includes directly accessible ' + 'private servers', dict()) + ] + + @patch(MODULE + '.config') + @patch(MODULE + '.Server') + @patch(MODULE + '.current_user') + def runTest(self, mock_current_user, mock_server, mock_config): + mock_config.SERVER_MODE = True + mock_current_user.has_role.return_value = True + + result = get_user_server_query() + + self.assertEqual(result, mock_server.query) + mock_server.query.filter.assert_not_called()