Skip to content

Ref/rename stream mat #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ead7268
feat: added stream pipeline support
May 9, 2025
fbc926c
feat: add streams app example
May 9, 2025
e3d91ca
chore: added reflection to hello streams example for easy testing
May 9, 2025
146c0f8
feat: added new factory function
May 9, 2025
342a639
feat: added support for header propagation
May 9, 2025
513e811
feat: added support for header propagation
May 9, 2025
f21e6c7
Merge branch 'master' into feat/new-streams-api
May 9, 2025
2bf9b77
chore: adjusts
May 9, 2025
c0f997f
feat: added new replace operator
May 9, 2025
39fa3e3
new line
May 10, 2025
69ed8d2
refactor: propagate context option
May 10, 2025
b09ff07
refactor: rename functions
May 10, 2025
2881a2d
refactor: removing the destructive version
May 10, 2025
fc39729
Update lib/grpc/stream.ex
sleipnir May 10, 2025
c42ddae
adjusts
May 10, 2025
03e39e9
Merge branch 'feat/new-streams-api' of https://github.com/sleipnir/gr…
May 10, 2025
eab7ffc
refactor: separating unary from non-unary
May 10, 2025
3821add
Update lib/grpc/stream.ex
sleipnir May 10, 2025
50aa725
Update lib/grpc/stream.ex
sleipnir May 10, 2025
bb2b9ec
Update lib/grpc/stream.ex
sleipnir May 10, 2025
3682711
Update lib/grpc/stream.ex
sleipnir May 10, 2025
7ca091f
chore: remove reject function
May 10, 2025
9cc7b65
chore: resolve conflict
May 10, 2025
588d5d8
Update lib/grpc/stream.ex
sleipnir May 10, 2025
777e3a5
chore: format
May 10, 2025
817dd81
Update lib/grpc/stream.ex
sleipnir May 10, 2025
071ed9e
refactor: rename operator reject to via
May 10, 2025
afbf33e
chore: pointing flow docs
May 10, 2025
a70feea
fix: correct api examples
May 10, 2025
69d7ba5
chore: change ask and join_with apis
May 10, 2025
fcec891
Merge branch 'feat/new-streams-api' of https://github.com/sleipnir/gr…
May 10, 2025
a40f36c
refactor: rename single to unary
May 10, 2025
d3c8a03
refactor: rename single to unary in tests
May 10, 2025
87fc8e9
chore: remove unascessary function
May 10, 2025
aba380e
chore: removes a not so necessary function
May 10, 2025
f90aee1
refactor: rename server stream to materializer
May 11, 2025
cdfae86
fix: capture exits in report test
May 11, 2025
5b7afdd
fix: formatting
May 11, 2025
ae8c5a0
chore: skip test
May 11, 2025
af51b29
chore: skip test
May 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ In the following sections you will see how to implement gRPC server logic.
defmodule Helloworld.Greeter.Server do
use GRPC.Server, service: Helloworld.Greeter.Service

@spec say_hello(Helloworld.HelloRequest.t, GRPC.Server.Stream.t) :: Helloworld.HelloReply.t
def say_hello(request, _stream) do
@spec say_hello(Helloworld.HelloRequest.t, GRPC.Server.Materializer.t) :: Helloworld.HelloReply.t
def say_hello(request, _mat) do
Helloworld.HelloReply.new(message: "Hello #{request.name}")
end
end
Expand Down Expand Up @@ -160,8 +160,8 @@ defmodule Helloworld.Greeter.Server do
service: Helloworld.Greeter.Service,
http_transcode: true

@spec say_hello(Helloworld.HelloRequest.t, GRPC.Server.Stream.t) :: Helloworld.HelloReply.t
def say_hello(request, _stream) do
@spec say_hello(Helloworld.HelloRequest.t, GRPC.Server.Materializer.t) :: Helloworld.HelloReply.t
def say_hello(request, _mat) do
%Helloworld.HelloReply{message: "Hello #{request.name}"}
end
end
Expand Down
4 changes: 2 additions & 2 deletions examples/helloworld/lib/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Helloworld.Greeter.Server do
service: Helloworld.Greeter.Service,
http_transcode: true

@spec say_hello(Helloworld.HelloRequest.t(), GRPC.Server.Stream.t()) ::
@spec say_hello(Helloworld.HelloRequest.t(), GRPC.Server.Materializer.t()) ::
Helloworld.HelloReply.t()
def say_hello(request, _stream) do
%Helloworld.HelloReply{
Expand All @@ -12,7 +12,7 @@ defmodule Helloworld.Greeter.Server do
}
end

@spec say_hello_from(Helloworld.HelloFromRequest.t(), GRPC.Server.Stream.t()) ::
@spec say_hello_from(Helloworld.HelloFromRequest.t(), GRPC.Server.Materializer.t()) ::
Helloworld.HelloReply.t()
def say_hello_from(request, _stream) do
%Helloworld.HelloReply{
Expand Down
4 changes: 4 additions & 0 deletions examples/helloworld_streams/.formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
26 changes: 26 additions & 0 deletions examples/helloworld_streams/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
helloworld_streams-*.tar

# Temporary files, for example, from tests.
/tmp/
21 changes: 21 additions & 0 deletions examples/helloworld_streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# HelloworldStreams

**TODO: Add description**

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `helloworld_streams` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:helloworld_streams, "~> 0.1.0"}
]
end
```

Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at <https://hexdocs.pm/helloworld_streams>.

5 changes: 5 additions & 0 deletions examples/helloworld_streams/lib/helloworld_streams.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule HelloworldStreams do
@moduledoc """
Documentation for `HelloworldStreams`.
"""
end
19 changes: 19 additions & 0 deletions examples/helloworld_streams/lib/helloworld_streams/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule HelloworldStreams.Application do
@moduledoc false
use Application

@impl true
def start(_type, _args) do
children = [
HelloworldStreams.Utils.Transformer,
GrpcReflection,
{
GRPC.Server.Supervisor,
endpoint: HelloworldStreams.Endpoint, port: 50053, start_server: true
}
]

opts = [strategy: :one_for_one, name: HelloworldStreams.Supervisor]
Supervisor.start_link(children, opts)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule HelloworldStreams.Endpoint do
@moduledoc false
use GRPC.Endpoint

intercept(GRPC.Server.Interceptors.Logger)
run(HelloworldStreams.Utils.Reflection)
run(HelloworldStreams.Server)
end
61 changes: 61 additions & 0 deletions examples/helloworld_streams/lib/helloworld_streams/server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule HelloworldStreams.Server do
@moduledoc """
gRPC service for streaming data.
"""
use GRPC.Server, service: Stream.EchoServer.Service

alias HelloworldStreams.Utils.Transformer
alias GRPC.Stream, as: GRPCStream

alias Stream.HelloRequest
alias Stream.HelloReply

@spec say_unary_hello(HelloRequest.t(), GRPC.Server.Materializer.t()) :: any()
def say_unary_hello(request, _materializer) do
GRPCStream.unary(request)
|> GRPCStream.ask(Transformer)
|> GRPCStream.map(fn %HelloReply{} = reply ->
%HelloReply{message: "[Reply] #{reply.message}"}
end)
|> GRPCStream.run()
end

@spec say_server_hello(HelloRequest.t(), GRPC.Server.Materializer.t()) :: any()
def say_server_hello(request, materializer) do
create_output_stream(request)
|> GRPCStream.from()
|> GRPCStream.run_with(materializer)
end

defp create_output_stream(msg) do
Stream.repeatedly(fn ->
index = :rand.uniform(10)
%HelloReply{message: "[#{index}] I'm the Server for #{msg.name}"}
end)
|> Stream.take(10)
|> Enum.to_list()
end

@spec say_bid_stream_hello(Enumerable.t(), GRPC.Server.Materializer.t()) :: any()
def say_bid_stream_hello(request, materializer) do
# simulate a infinite stream of data
# this is a simple example, in a real world application
# you would probably use a GenStage or similar
# to handle the stream of data
output_stream =
Stream.repeatedly(fn ->
index = :rand.uniform(10)
%HelloReply{message: "[#{index}] I'm the Server ;)"}
end)

GRPCStream.from(request, join_with: output_stream)
|> GRPCStream.map(fn
%HelloRequest{} = hello ->
%HelloReply{message: "Welcome #{hello.name}"}

output_item ->
output_item
end)
|> GRPCStream.run_with(materializer)
end
end
156 changes: 156 additions & 0 deletions examples/helloworld_streams/lib/helloworld_streams/stream.pb.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
defmodule Stream.HelloRequest do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3

def descriptor do
# credo:disable-for-next-line
%Google.Protobuf.DescriptorProto{
name: "HelloRequest",
field: [
%Google.Protobuf.FieldDescriptorProto{
name: "name",
extendee: nil,
number: 1,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "name",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
enum_type: [],
extension_range: [],
extension: [],
options: nil,
oneof_decl: [],
reserved_range: [],
reserved_name: [],
__unknown_fields__: []
}
end

field(:name, 1, type: :string)
end

defmodule Stream.HelloReply do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3

def descriptor do
# credo:disable-for-next-line
%Google.Protobuf.DescriptorProto{
name: "HelloReply",
field: [
%Google.Protobuf.FieldDescriptorProto{
name: "message",
extendee: nil,
number: 1,
label: :LABEL_OPTIONAL,
type: :TYPE_STRING,
type_name: nil,
default_value: nil,
options: nil,
oneof_index: nil,
json_name: "message",
proto3_optional: nil,
__unknown_fields__: []
}
],
nested_type: [],
enum_type: [],
extension_range: [],
extension: [],
options: nil,
oneof_decl: [],
reserved_range: [],
reserved_name: [],
__unknown_fields__: []
}
end

field(:message, 1, type: :string)
end

defmodule Stream.EchoServer.Service do
@moduledoc false

use GRPC.Service, name: "stream.EchoServer", protoc_gen_elixir_version: "0.14.0"

def descriptor do
# credo:disable-for-next-line
%Google.Protobuf.ServiceDescriptorProto{
name: "EchoServer",
method: [
%Google.Protobuf.MethodDescriptorProto{
name: "SayUnaryHello",
input_type: ".stream.HelloRequest",
output_type: ".stream.HelloReply",
options: %Google.Protobuf.MethodOptions{
deprecated: false,
idempotency_level: :IDEMPOTENCY_UNKNOWN,
features: nil,
uninterpreted_option: [],
__pb_extensions__: %{},
__unknown_fields__: []
},
client_streaming: false,
server_streaming: false,
__unknown_fields__: []
},
%Google.Protobuf.MethodDescriptorProto{
name: "SayServerHello",
input_type: ".stream.HelloRequest",
output_type: ".stream.HelloReply",
options: %Google.Protobuf.MethodOptions{
deprecated: false,
idempotency_level: :IDEMPOTENCY_UNKNOWN,
features: nil,
uninterpreted_option: [],
__pb_extensions__: %{},
__unknown_fields__: []
},
client_streaming: false,
server_streaming: true,
__unknown_fields__: []
},
%Google.Protobuf.MethodDescriptorProto{
name: "SayBidStreamHello",
input_type: ".stream.HelloRequest",
output_type: ".stream.HelloReply",
options: %Google.Protobuf.MethodOptions{
deprecated: false,
idempotency_level: :IDEMPOTENCY_UNKNOWN,
features: nil,
uninterpreted_option: [],
__pb_extensions__: %{},
__unknown_fields__: []
},
client_streaming: true,
server_streaming: true,
__unknown_fields__: []
}
],
options: nil,
__unknown_fields__: []
}
end

rpc(:SayUnaryHello, Stream.HelloRequest, Stream.HelloReply)

rpc(:SayServerHello, Stream.HelloRequest, stream(Stream.HelloReply))

rpc(:SayBidStreamHello, stream(Stream.HelloRequest), stream(Stream.HelloReply))
end

defmodule Stream.EchoServer.Stub do
@moduledoc false

use GRPC.Stub, service: Stream.EchoServer.Service
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule HelloworldStreams.Utils.Reflection do
@moduledoc """
gRPC reflection server.
"""
use GrpcReflection.Server,
version: :v1,
services: [Stream.EchoServer.Service]
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule HelloworldStreams.Utils.Transformer do
@moduledoc """
`Transformer` GenServer for example purposes.
"""
use GenServer

alias Stream.HelloRequest
alias Stream.HelloReply

def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end

def init(_), do: {:ok, %{}}

def handle_info({:request, %HelloRequest{} = value, from}, state) do
Process.send(from, {:response, %HelloReply{message: "Hello #{value.name}"}}, [])
{:noreply, state}
end
end
Loading
Loading