diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index 61ba242..8f63c0e 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -7,6 +7,14 @@ CurrentModule = DistributedNext This documents notable changes in DistributedNext.jl. The format is based on [Keep a Changelog](https://keepachangelog.com). +## Unreleased + +### Changed +- DistributedNext no longer requires cluster managers implementations to subtype + the [`ClusterManager`](@ref) type, instead cluster manager support can be + implemented using trait methods ([#67]). This was done to allow existing + cluster managers to keep supporting both Distributed and DistributedNext. + ## [v1.3.0] - 2026-04-06 ### Changed diff --git a/docs/src/index.md b/docs/src/index.md index 1e979ad..c788ef9 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -79,13 +79,13 @@ and transport messages between processes. It is possible for Cluster Managers to ```@docs DistributedNext.ClusterManager +DistributedNext.is_cluster_manager DistributedNext.WorkerConfig DistributedNext.launch DistributedNext.manage -DistributedNext.kill(::ClusterManager, ::Int, ::WorkerConfig) -DistributedNext.connect(::ClusterManager, ::Int, ::WorkerConfig) +DistributedNext.kill(::Any, ::Int, ::WorkerConfig) +DistributedNext.connect(::Any, ::Int, ::WorkerConfig) DistributedNext.init_worker -DistributedNext.start_worker DistributedNext.process_messages DistributedNext.default_addprocs_params ``` diff --git a/src/DistributedNext.jl b/src/DistributedNext.jl index dcac2cd..b79a452 100644 --- a/src/DistributedNext.jl +++ b/src/DistributedNext.jl @@ -21,7 +21,6 @@ using Base.Threads: Event using Serialization, Sockets import Serialization: serialize, deserialize -import Sockets: connect, wait_connected @static if VERSION < v"1.11" using ScopedValues: ScopedValue, @with @@ -171,7 +170,7 @@ include("managers.jl") # LocalManager and SSHManager worker_exited_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}() # Cluster manager - cluster_manager::Ref{ClusterManager} = Ref{ClusterManager}() + cluster_manager::Ref{Any} = Ref{Any}() # Synchronization worker_lock::ReentrantLock = ReentrantLock() diff --git a/src/cluster.jl b/src/cluster.jl index 24bd1a9..794252a 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -6,9 +6,46 @@ Supertype for cluster managers, which control workers processes as a cluster. Cluster managers implement how workers can be added, removed and communicated with. `SSHManager` and `LocalManager` are subtypes of this. + +!!! note + Subtyping `ClusterManager` is no longer required. DistributedNext now + uses the [`is_cluster_manager`](@ref) trait to recognise cluster managers, + so any type can opt in by defining `DistributedNext.is_cluster_manager(::MyMgr) = true`. + `ClusterManager` is kept for backward compatibility; a subtype is automatically + recognised as a cluster manager via a trait fallback. """ abstract type ClusterManager end +""" + is_cluster_manager(x) -> Bool + +Trait identifying `x` as a cluster manager. Defaults to `false`. Cluster +managers opt in by defining a method returning `true`: + +```julia +DistributedNext.is_cluster_manager(::MyManager) = true +``` + +Any subtype of [`ClusterManager`](@ref) is automatically recognised via a +fallback method. Defining this trait does *not* require subtyping +`ClusterManager`, which lets external types (for example, types already +subtyping `Distributed.ClusterManager`) act as DistributedNext cluster +managers without multiple inheritance. +""" +is_cluster_manager(::Any) = false +is_cluster_manager(::ClusterManager) = true + +# Throw an ArgumentError unless `manager` has opted into the cluster-manager +# trait. Used by entry points accepting user-supplied managers so we fail early +# with a clear message. +function check_cluster_manager(manager) + if !is_cluster_manager(manager) + throw(ArgumentError("$(typeof(manager)) is not recognised as a cluster manager. " * + "Define `DistributedNext.is_cluster_manager(::$(typeof(manager))) = true` " * + "to opt in.")) + end +end + function throw_if_cluster_manager_unassigned() isassigned(CTX[].cluster_manager) || error("cluster_manager is unassigned") return nothing @@ -121,12 +158,12 @@ mutable struct Worker w_stream::IO w_serializer::ClusterSerializer # writes can happen from any task hence store the # serializer as part of the Worker object - manager::ClusterManager + manager::Any config::WorkerConfig version::Union{VersionNumber, Nothing} # Julia version of the remote process initialized::Event - function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager; + function Worker(id::Int, r_stream::IO, w_stream::IO, manager; version::Union{VersionNumber, Nothing}=nothing, config::WorkerConfig=WorkerConfig()) w = Worker(id) @@ -404,14 +441,14 @@ function parse_connection_info(str) end """ - init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager()) + init_worker(cookie::AbstractString, manager=DefaultClusterManager()) Called by cluster managers implementing custom transports. It initializes a newly launched process as a worker. Command line argument `--worker[=]` has the effect of initializing a process as a worker using TCP/IP sockets for transport. `cookie` is a [`cluster_cookie`](@ref). """ -function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager()) +function init_worker(cookie::AbstractString, manager=DefaultClusterManager()) myrole!(:worker) # On workers, the default cluster manager connects via TCP sockets. Custom @@ -440,7 +477,7 @@ end # Only one addprocs can be in progress at any time # """ - addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers + addprocs(manager; kwargs...) -> List of process identifiers Launches worker processes via the specified cluster manager. @@ -479,7 +516,8 @@ if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't end ``` """ -function addprocs(manager::ClusterManager; kwargs...) +function addprocs(manager; kwargs...) + check_cluster_manager(manager) params = merge(default_addprocs_params(manager), Dict{Symbol, Any}(kwargs)) init_multi() @@ -492,7 +530,7 @@ function addprocs(manager::ClusterManager; kwargs...) warning_interval, [(manager, params)]) # Add new workers - new_workers = @lock CTX[].worker_lock addprocs_locked(manager::ClusterManager, params) + new_workers = @lock CTX[].worker_lock addprocs_locked(manager, params) # Call worker-started callbacks _run_callbacks_concurrently("worker-started", CTX[].worker_started_callbacks, @@ -501,7 +539,7 @@ function addprocs(manager::ClusterManager; kwargs...) return new_workers end -function addprocs_locked(manager::ClusterManager, params) +function addprocs_locked(manager, params) topology(Symbol(params[:topology])) if CTX[].pgrp.topology !== :all_to_all @@ -574,13 +612,13 @@ function set_valid_processes(plist::Array{Int}) end """ - default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any} + default_addprocs_params(mgr) -> Dict{Symbol, Any} Implemented by cluster managers. The default keyword parameters passed when calling `addprocs(mgr)`. The minimal set of options is available by calling `default_addprocs_params()` """ -default_addprocs_params(::ClusterManager) = default_addprocs_params() +default_addprocs_params(_) = default_addprocs_params() default_addprocs_params() = Dict{Symbol,Any}( :topology => :all_to_all, :dir => pwd(), @@ -639,7 +677,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch end end -function create_worker(manager::ClusterManager, wconfig::WorkerConfig) +function create_worker(manager, wconfig::WorkerConfig) # only node 1 can add new nodes, since nobody else has the full list of address:port @assert CTX[].lproc.id == 1 timeout = worker_timeout() diff --git a/src/managers.jl b/src/managers.jl index a65ebab..f5d1f78 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -567,7 +567,7 @@ function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Sy end """ - launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition) + launch(manager, params::Dict, launched::Array, launch_ntfy::Condition) Implemented by cluster managers. For every Julia worker launched by this function, it should append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit @@ -577,7 +577,7 @@ keyword arguments [`addprocs`](@ref) was called with. launch """ - manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol) + manage(manager, id::Integer, config::WorkerConfig, op::Symbol) Implemented by cluster managers. It is called on the master process, during a worker's lifetime, with appropriate `op` values: @@ -596,17 +596,17 @@ end """ - connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO) + connect(manager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO) Implemented by cluster managers using custom transports. It should establish a logical connection to worker with id `pid`, specified by `config` and return a pair of `IO` objects. Messages from `pid` to current process will be read off `instrm`, while messages to be sent to `pid` will be written to `outstrm`. The custom transport implementation must ensure that messages are delivered and received completely and in order. -`connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between +`connect(manager, ...)` sets up TCP/IP socket connections in-between workers. """ -function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) +function connect(manager, pid::Int, config::WorkerConfig) if config.connect_at !== nothing # this is a worker-to-worker setup call. return connect_w2w(pid, config) @@ -728,7 +728,7 @@ function connect_to_worker(host::AbstractString, port::Integer) iptype = typeof(bind_addr) sock = socket_reuse_port(iptype) - connect(sock, bind_addr, UInt16(port)) + Sockets.connect(sock, bind_addr, UInt16(port)) (sock, string(bind_addr)) end @@ -736,7 +736,7 @@ end function connect_to_worker_with_tunnel(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags, multiplex) localport = ssh_tunnel(tunnel_user, host, bind_addr, UInt16(port), sshflags, multiplex) - s = connect("localhost", localport) + s = Sockets.connect("localhost", localport) forward = "$localport:$bind_addr:$port" (s, bind_addr, forward) end @@ -755,15 +755,16 @@ end """ - kill(manager::ClusterManager, pid::Int, config::WorkerConfig) + kill(manager, pid::Int, config::WorkerConfig) Implemented by cluster managers. It is called on the master process, by [`rmprocs`](@ref). It should cause the remote worker specified by `pid` to exit. -`kill(manager::ClusterManager.....)` executes a remote `exit()` +`kill(manager, ...)` executes a remote `exit()` on `pid`. """ -function kill(manager::ClusterManager, pid::Int, config::WorkerConfig) +function kill(manager, pid::Int, config::WorkerConfig) + check_cluster_manager(manager) remote_do(exit, pid) nothing end diff --git a/src/process_messages.jl b/src/process_messages.jl index 22f3c4d..06492cb 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -125,11 +125,11 @@ end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) Sockets.nagle(r_stream, false) Sockets.quickack(r_stream, true) - wait_connected(r_stream) + Sockets.wait_connected(r_stream) if r_stream != w_stream Sockets.nagle(w_stream, false) Sockets.quickack(w_stream, true) - wait_connected(w_stream) + Sockets.wait_connected(w_stream) end message_handler_loop(r_stream, w_stream, incoming) end @@ -367,7 +367,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid())) end -function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig) +function connect_to_peer(manager, rpid::Int, wconfig::WorkerConfig) try (r_s, w_s) = connect(manager, rpid, wconfig) w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker diff --git a/test/managers.jl b/test/managers.jl index 1e0fc81..f14b2b1 100644 --- a/test/managers.jl +++ b/test/managers.jl @@ -26,3 +26,13 @@ using DistributedNext: parse_machine, SSHManager, LocalManager sprint((t,x) -> show(t, "text/plain", x), SSHManager("127.0.0.1"))) @test sprint((t,x) -> show(t, "text/plain", x), LocalManager(1, true)) == "LocalManager()" end + +@testset "is_cluster_manager trait" begin + # Subtypes of ClusterManager opt in automatically + @test DistributedNext.is_cluster_manager(LocalManager(1, true)) + + # Arbitrary types do not and cause exceptions + struct NotAManager end + @test !DistributedNext.is_cluster_manager(NotAManager()) + @test_throws ArgumentError DistributedNext.addprocs(NotAManager()) +end