Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ CurrentModule = DistributedNext
This documents notable changes in DistributedNext.jl. The format is based on
[Keep a Changelog](https://keepachangelog.com).

## Unreleased

### Changed
- DistributedNext no longer requires cluster managers implementations to subtype
the [`ClusterManager`](@ref) type, instead cluster manager support can be
implemented using trait methods ([#67]). This was done to allow existing
cluster managers to keep supporting both Distributed and DistributedNext.

## [v1.3.0] - 2026-04-06

### Changed
Expand Down
6 changes: 3 additions & 3 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ and transport messages between processes. It is possible for Cluster Managers to

```@docs
DistributedNext.ClusterManager
DistributedNext.is_cluster_manager
DistributedNext.WorkerConfig
DistributedNext.launch
DistributedNext.manage
DistributedNext.kill(::ClusterManager, ::Int, ::WorkerConfig)
DistributedNext.connect(::ClusterManager, ::Int, ::WorkerConfig)
DistributedNext.kill(::Any, ::Int, ::WorkerConfig)
DistributedNext.connect(::Any, ::Int, ::WorkerConfig)
DistributedNext.init_worker
DistributedNext.start_worker
DistributedNext.process_messages
DistributedNext.default_addprocs_params
```
3 changes: 1 addition & 2 deletions src/DistributedNext.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ using Base.Threads: Event

using Serialization, Sockets
import Serialization: serialize, deserialize
import Sockets: connect, wait_connected

@static if VERSION < v"1.11"
using ScopedValues: ScopedValue, @with
Expand Down Expand Up @@ -171,7 +170,7 @@ include("managers.jl") # LocalManager and SSHManager
worker_exited_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()

# Cluster manager
cluster_manager::Ref{ClusterManager} = Ref{ClusterManager}()
cluster_manager::Ref{Any} = Ref{Any}()

# Synchronization
worker_lock::ReentrantLock = ReentrantLock()
Expand Down
60 changes: 49 additions & 11 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,46 @@
Supertype for cluster managers, which control workers processes as a cluster.
Cluster managers implement how workers can be added, removed and communicated with.
`SSHManager` and `LocalManager` are subtypes of this.

!!! note
Subtyping `ClusterManager` is no longer required. DistributedNext now
uses the [`is_cluster_manager`](@ref) trait to recognise cluster managers,
so any type can opt in by defining `DistributedNext.is_cluster_manager(::MyMgr) = true`.
`ClusterManager` is kept for backward compatibility; a subtype is automatically
recognised as a cluster manager via a trait fallback.
"""
abstract type ClusterManager end

"""
is_cluster_manager(x) -> Bool

Trait identifying `x` as a cluster manager. Defaults to `false`. Cluster
managers opt in by defining a method returning `true`:

```julia
DistributedNext.is_cluster_manager(::MyManager) = true
```

Any subtype of [`ClusterManager`](@ref) is automatically recognised via a
fallback method. Defining this trait does *not* require subtyping
`ClusterManager`, which lets external types (for example, types already
subtyping `Distributed.ClusterManager`) act as DistributedNext cluster
managers without multiple inheritance.
"""
is_cluster_manager(::Any) = false
is_cluster_manager(::ClusterManager) = true

# Throw an ArgumentError unless `manager` has opted into the cluster-manager
# trait. Used by entry points accepting user-supplied managers so we fail early
# with a clear message.
function check_cluster_manager(manager)
if !is_cluster_manager(manager)
throw(ArgumentError("$(typeof(manager)) is not recognised as a cluster manager. " *
"Define `DistributedNext.is_cluster_manager(::$(typeof(manager))) = true` " *
"to opt in."))
end
end

function throw_if_cluster_manager_unassigned()
isassigned(CTX[].cluster_manager) || error("cluster_manager is unassigned")
return nothing
Expand Down Expand Up @@ -121,12 +158,12 @@ mutable struct Worker
w_stream::IO
w_serializer::ClusterSerializer # writes can happen from any task hence store the
# serializer as part of the Worker object
manager::ClusterManager
manager::Any
config::WorkerConfig
version::Union{VersionNumber, Nothing} # Julia version of the remote process
initialized::Event

function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager;
function Worker(id::Int, r_stream::IO, w_stream::IO, manager;
version::Union{VersionNumber, Nothing}=nothing,
config::WorkerConfig=WorkerConfig())
w = Worker(id)
Expand Down Expand Up @@ -404,14 +441,14 @@ function parse_connection_info(str)
end

"""
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
init_worker(cookie::AbstractString, manager=DefaultClusterManager())

Called by cluster managers implementing custom transports. It initializes a newly launched
process as a worker. Command line argument `--worker[=<cookie>]` has the effect of initializing a
process as a worker using TCP/IP sockets for transport.
`cookie` is a [`cluster_cookie`](@ref).
"""
function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
function init_worker(cookie::AbstractString, manager=DefaultClusterManager())
myrole!(:worker)

# On workers, the default cluster manager connects via TCP sockets. Custom
Expand Down Expand Up @@ -440,7 +477,7 @@ end
# Only one addprocs can be in progress at any time
#
"""
addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
addprocs(manager; kwargs...) -> List of process identifiers

Launches worker processes via the specified cluster manager.

Expand Down Expand Up @@ -479,7 +516,8 @@ if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't
end
```
"""
function addprocs(manager::ClusterManager; kwargs...)
function addprocs(manager; kwargs...)
check_cluster_manager(manager)
params = merge(default_addprocs_params(manager), Dict{Symbol, Any}(kwargs))

init_multi()
Expand All @@ -492,7 +530,7 @@ function addprocs(manager::ClusterManager; kwargs...)
warning_interval, [(manager, params)])

# Add new workers
new_workers = @lock CTX[].worker_lock addprocs_locked(manager::ClusterManager, params)
new_workers = @lock CTX[].worker_lock addprocs_locked(manager, params)

# Call worker-started callbacks
_run_callbacks_concurrently("worker-started", CTX[].worker_started_callbacks,
Expand All @@ -501,7 +539,7 @@ function addprocs(manager::ClusterManager; kwargs...)
return new_workers
end

function addprocs_locked(manager::ClusterManager, params)
function addprocs_locked(manager, params)
topology(Symbol(params[:topology]))

if CTX[].pgrp.topology !== :all_to_all
Expand Down Expand Up @@ -574,13 +612,13 @@ function set_valid_processes(plist::Array{Int})
end

"""
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
default_addprocs_params(mgr) -> Dict{Symbol, Any}

Implemented by cluster managers. The default keyword parameters passed when calling
`addprocs(mgr)`. The minimal set of options is available by calling
`default_addprocs_params()`
"""
default_addprocs_params(::ClusterManager) = default_addprocs_params()
default_addprocs_params(_) = default_addprocs_params()
default_addprocs_params() = Dict{Symbol,Any}(
:topology => :all_to_all,
:dir => pwd(),
Expand Down Expand Up @@ -639,7 +677,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
end
end

function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
function create_worker(manager, wconfig::WorkerConfig)
# only node 1 can add new nodes, since nobody else has the full list of address:port
@assert CTX[].lproc.id == 1
timeout = worker_timeout()
Expand Down
21 changes: 11 additions & 10 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Sy
end

"""
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
launch(manager, params::Dict, launched::Array, launch_ntfy::Condition)

Implemented by cluster managers. For every Julia worker launched by this function, it should
append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit
Expand All @@ -577,7 +577,7 @@ keyword arguments [`addprocs`](@ref) was called with.
launch

"""
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
manage(manager, id::Integer, config::WorkerConfig, op::Symbol)

Implemented by cluster managers. It is called on the master process, during a worker's
lifetime, with appropriate `op` values:
Expand All @@ -596,17 +596,17 @@ end


"""
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
connect(manager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Implemented by cluster managers using custom transports. It should establish a logical
connection to worker with id `pid`, specified by `config` and return a pair of `IO`
objects. Messages from `pid` to current process will be read off `instrm`, while messages to
be sent to `pid` will be written to `outstrm`. The custom transport implementation must
ensure that messages are delivered and received completely and in order.
`connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between
`connect(manager, ...)` sets up TCP/IP socket connections in-between
workers.
"""
function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
function connect(manager, pid::Int, config::WorkerConfig)
if config.connect_at !== nothing
# this is a worker-to-worker setup call.
return connect_w2w(pid, config)
Expand Down Expand Up @@ -728,15 +728,15 @@ function connect_to_worker(host::AbstractString, port::Integer)

iptype = typeof(bind_addr)
sock = socket_reuse_port(iptype)
connect(sock, bind_addr, UInt16(port))
Sockets.connect(sock, bind_addr, UInt16(port))

(sock, string(bind_addr))
end


function connect_to_worker_with_tunnel(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags, multiplex)
localport = ssh_tunnel(tunnel_user, host, bind_addr, UInt16(port), sshflags, multiplex)
s = connect("localhost", localport)
s = Sockets.connect("localhost", localport)
forward = "$localport:$bind_addr:$port"
(s, bind_addr, forward)
end
Expand All @@ -755,15 +755,16 @@ end


"""
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
kill(manager, pid::Int, config::WorkerConfig)

Implemented by cluster managers.
It is called on the master process, by [`rmprocs`](@ref).
It should cause the remote worker specified by `pid` to exit.
`kill(manager::ClusterManager.....)` executes a remote `exit()`
`kill(manager, ...)` executes a remote `exit()`
on `pid`.
"""
function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
function kill(manager, pid::Int, config::WorkerConfig)
check_cluster_manager(manager)
remote_do(exit, pid)
nothing
end
Expand Down
6 changes: 3 additions & 3 deletions src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ end
function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
Sockets.nagle(r_stream, false)
Sockets.quickack(r_stream, true)
wait_connected(r_stream)
Sockets.wait_connected(r_stream)
if r_stream != w_stream
Sockets.nagle(w_stream, false)
Sockets.quickack(w_stream, true)
wait_connected(w_stream)
Sockets.wait_connected(w_stream)
end
message_handler_loop(r_stream, w_stream, incoming)
end
Expand Down Expand Up @@ -367,7 +367,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid()))
end

function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
function connect_to_peer(manager, rpid::Int, wconfig::WorkerConfig)
try
(r_s, w_s) = connect(manager, rpid, wconfig)
w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker
Expand Down
10 changes: 10 additions & 0 deletions test/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,13 @@ using DistributedNext: parse_machine, SSHManager, LocalManager
sprint((t,x) -> show(t, "text/plain", x), SSHManager("127.0.0.1")))
@test sprint((t,x) -> show(t, "text/plain", x), LocalManager(1, true)) == "LocalManager()"
end

@testset "is_cluster_manager trait" begin
# Subtypes of ClusterManager opt in automatically
@test DistributedNext.is_cluster_manager(LocalManager(1, true))

# Arbitrary types do not and cause exceptions
struct NotAManager end
@test !DistributedNext.is_cluster_manager(NotAManager())
@test_throws ArgumentError DistributedNext.addprocs(NotAManager())
end
Loading