From ed3b84f06fd424eb3786add714e208f2a009320a Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 15:06:34 +1200 Subject: [PATCH] Add Falcon cluster service environment --- lib/falcon/environment/cluster.rb | 51 ++++++++++++++++++ lib/falcon/service/cluster.rb | 65 +++++++++++++++++++++++ lib/falcon/service/server.rb | 20 +++++-- releases.md | 1 + test/falcon/environment/cluster.rb | 24 +++++++++ test/falcon/service/cluster.rb | 84 ++++++++++++++++++++++++++++++ 6 files changed, 240 insertions(+), 5 deletions(-) create mode 100644 lib/falcon/environment/cluster.rb create mode 100644 lib/falcon/service/cluster.rb create mode 100644 test/falcon/environment/cluster.rb create mode 100644 test/falcon/service/cluster.rb diff --git a/lib/falcon/environment/cluster.rb b/lib/falcon/environment/cluster.rb new file mode 100644 index 0000000..862d0f1 --- /dev/null +++ b/lib/falcon/environment/cluster.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require_relative "server" +require_relative "../service/cluster" + +module Falcon + module Environment + # Provides an environment for hosting a cluster of Falcon server workers, where each worker binds its own endpoint. + module Cluster + include Server + + # The service class to use for the cluster. + # @returns [Class] + def service_class + Service::Cluster + end + + # The host that this server will receive connections for. + def url + "http://[::]:0" + end + + # The endpoint bound by the current worker. + # @returns [IO::Endpoint::BoundEndpoint | Nil] + def bound_endpoint + @bound_endpoint + end + + # Set the endpoint bound by the current worker. + # @parameter bound_endpoint [IO::Endpoint::BoundEndpoint] + def bound_endpoint=(bound_endpoint) + @bound_endpoint = bound_endpoint + end + + # The first socket address bound by the current worker. + # @returns [Addrinfo | Nil] + def bound_address + @bound_endpoint&.sockets&.first&.to_io&.local_address + end + + # The port bound by the current worker. + # @returns [Integer | Nil] + def bound_port + bound_address&.ip_port + end + end + end +end diff --git a/lib/falcon/service/cluster.rb b/lib/falcon/service/cluster.rb new file mode 100644 index 0000000..d09d8a2 --- /dev/null +++ b/lib/falcon/service/cluster.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require_relative "server" + +module Falcon + # @namespace + module Service + # A managed service for running Falcon workers with independently bound endpoints. + class Cluster < Server + # Cluster workers bind independently in their own process. + def bind_endpoint + end + + # Setup the service into the specified container. + # @parameter container [Async::Container] The container to configure. + def setup(container) + container_options = @evaluator.container_options + health_check_timeout = container_options[:health_check_timeout] + + container.run(**container_options) do |instance| + clock = Async::Clock.start + bound_endpoint = nil + + begin + Async do |task| + evaluator = self.environment.evaluator + server = nil + + health_checker(instance, health_check_timeout) do + if server + instance.name = format_title(evaluator, server) + end + end + + instance.status!("Preparing...") + + bound_endpoint = evaluator.endpoint.bound + evaluator.bound_endpoint = bound_endpoint + + evaluator.prepare!(instance) + emit_prepared(instance, clock) + + instance.status!("Running...") + server = run(instance, evaluator, bound_endpoint) + instance.name = format_title(evaluator, server) + emit_running(instance, clock) + + instance.ready! + + sleep + ensure + bound_endpoint&.close + task&.children&.each(&:stop) + end + ensure + bound_endpoint&.close + end + end + end + end + end +end diff --git a/lib/falcon/service/server.rb b/lib/falcon/service/server.rb index c3061bc..f5e7f84 100644 --- a/lib/falcon/service/server.rb +++ b/lib/falcon/service/server.rb @@ -21,8 +21,8 @@ def initialize(...) @bound_endpoint = nil end - # Prepare the bound endpoint for the server. - def start + # Bind the endpoint used by each server worker. + def bind_endpoint @endpoint = @evaluator.endpoint Sync do @@ -30,6 +30,11 @@ def start end Console.info(self){"Starting #{self.name} on #{@endpoint}"} + end + + # Prepare the bound endpoint for the server. + def start + bind_endpoint super end @@ -38,20 +43,25 @@ def start # # @parameter instance [Object] The container instance. # @parameter evaluator [Environment::Evaluator] The environment evaluator. + # @parameter bound_endpoint [IO::Endpoint] The endpoint bound by this worker. # @returns [Falcon::Server] The server instance. - def run(instance, evaluator) + def run(instance, evaluator, bound_endpoint = @bound_endpoint) if evaluator.respond_to?(:make_supervised_worker) Console.warn(self, "Async::Container::Supervisor is replaced by Async::Service::Supervisor, please update your service definition.") evaluator.make_supervised_worker(instance).run end - server = evaluator.make_server(@bound_endpoint) + server = evaluator.make_server(bound_endpoint) Async do |task| server.run - task.children&.each(&:wait) + begin + task.children&.each(&:wait) + rescue IOError + raise unless bound_endpoint.respond_to?(:sockets) && bound_endpoint.sockets.empty? + end end server diff --git a/releases.md b/releases.md index 7b76ef1..9267aa9 100644 --- a/releases.md +++ b/releases.md @@ -2,6 +2,7 @@ ## Unreleased + - Add `Falcon::Environment::Cluster` and `Falcon::Service::Cluster` for running workers with independently bound endpoints. - Move Falcon middleware trace providers to `traces/provider/falcon/middleware`. ## v0.55.5 diff --git a/test/falcon/environment/cluster.rb b/test/falcon/environment/cluster.rb new file mode 100644 index 0000000..09a9fc4 --- /dev/null +++ b/test/falcon/environment/cluster.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "falcon/environment/cluster" +require "async/service/environment" + +describe Falcon::Environment::Cluster do + let(:evaluator) do + Async::Service::Environment.build(subject, name: "localhost").evaluator + end + + it "provides default cluster settings" do + expect(evaluator).to have_attributes( + service_class: be == Falcon::Service::Cluster, + url: be == "http://[::]:0", + authority: be == "localhost", + bound_endpoint: be == nil, + bound_address: be == nil, + bound_port: be == nil, + ) + end +end diff --git a/test/falcon/service/cluster.rb b/test/falcon/service/cluster.rb new file mode 100644 index 0000000..a7a75e7 --- /dev/null +++ b/test/falcon/service/cluster.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "falcon/service/cluster" +require "falcon/environment/cluster" +require "async/container" +require "async/service/environment" +require "fileutils" +require "net/http" +require "protocol/http/middleware" + +describe Falcon::Service::Cluster do + let(:ports_path) {File.expand_path(".cluster/ports.txt", __dir__)} + + let(:recorder) do + path = ports_path + + Module.new do + def middleware + Protocol::HTTP::Middleware::HelloWorld + end + + def container_options + super.merge(restart: false) + end + + define_method(:prepare!) do |instance| + super(instance) + + File.open(path, "a") do |file| + file.puts(bound_port) + end + end + end + end + + let(:environment) do + Async::Service::Environment.new(Falcon::Environment::Cluster).with( + recorder, + name: "hello", + root: File.expand_path(".cluster/hello", __dir__), + url: "http://127.0.0.1:0", + count: 2, + ) + end + + let(:server) do + subject.new(environment) + end + + before do + FileUtils.rm_rf(File.dirname(ports_path)) + FileUtils.mkdir_p(File.dirname(ports_path)) + end + + after do + FileUtils.rm_rf(File.dirname(ports_path)) + end + + it "binds a unique port for each worker before preparing the instance" do + container = Async::Container.new + + server.start + server.setup(container) + container.wait_until_ready + + ports = File.readlines(ports_path, chomp: true).map(&:to_i) + + expect(ports).to have_attributes(size: be == 2) + expect(ports).to have_value(be > 0) + expect(ports.uniq).to be == ports + + ports.each do |port| + response = Net::HTTP.get_response(URI("http://127.0.0.1:#{port}/")) + + expect(response).to have_attributes(code: be == "200") + end + ensure + server.stop + container&.stop + end +end