Need some help for using Port Forward feature #1411
Replies: 10 comments
-
sdk should work the same as kubectl from you ssh log, seems the connection stuck. did you test other algo? |
Beta Was this translation helpful? Give feedback.
-
Thank you for the reply. |
Beta Was this translation helpful? Give feedback.
-
ok allow me some time to repro and see if any bug in sdk |
Beta Was this translation helpful? Give feedback.
-
Thank you so much! |
Beta Was this translation helpful? Give feedback.
-
Can you send your port forwarding code? Did you see the example here: https://github.com/kubernetes-client/csharp/blob/master/examples/portforward/PortForward.cs#L41 Are you making sure to spawn multiple threads to handle the reading/writing simultaneously in both directions? Also you need to make sure there's no contention between those threads. |
Beta Was this translation helpful? Give feedback.
-
I have been working on a new port forwarder that you can try. Once I get it stabilized, I will PR it over here. using k8s;
using System.Net.NetworkInformation;
using System.Net;
using System.Net.Sockets;
namespace KubeUI.Core.Client;
public class PortForwarder
{
IKubernetes client;
TcpListener listener;
public string PodName { get; }
public string Namespace { get; }
public int ContainerPort { get; }
public int LocalPort { get; }
public string Status { get; private set; }
public PortForwarder(IKubernetes client, string podName, string @namespace, int containerPort, int localPort)
{
this.client = client;
PodName = podName;
Namespace = @namespace;
ContainerPort = containerPort;
LocalPort = localPort;
listener = new TcpListener(IPAddress.Loopback, localPort);
}
public void Start()
{
if (!IsPortAvailable(LocalPort))
{
Status = "Local port is busy";
return;
}
listener.Start();
listener.BeginAcceptSocket(new AsyncCallback(ClientConnected), null);
Status = "Active";
}
public void Stop()
{
listener.Stop();
Status = "Inactive";
}
private void ClientConnected(IAsyncResult result)
{
var socket = listener.EndAcceptSocket(result);
Task.Run(async () => await HandleConnection(socket));
listener.BeginAcceptSocket(new AsyncCallback(ClientConnected), null);
}
private async Task HandleConnection(Socket socket)
{
using var webSocket = await client.WebSocketNamespacedPodPortForwardAsync(PodName, Namespace, new int[] { ContainerPort }, "v4.channel.k8s.io");
using var demux = new StreamDemuxer(webSocket, StreamType.PortForward);
demux.Start();
using var stream = demux.GetStream((byte?)0, (byte?)0);
var read = Task.Run(() =>
{
var buffer = new byte[4096];
while (SocketConnected(socket))
{
var bytesReceived = socket.Receive(buffer);
stream.Write(buffer, 0, bytesReceived);
}
});
var write = Task.Run(() =>
{
var buffer = new byte[4096];
while (SocketConnected(socket))
{
var bytesReceived = stream.Read(buffer, 0, 4096);
socket.Send(buffer, bytesReceived, 0);
}
});
await read;
await write;
socket.Close();
}
private static bool IsPortAvailable(int port)
{
var properties = IPGlobalProperties.GetIPGlobalProperties();
var activeTcpListeners = properties.GetActiveTcpListeners();
return !activeTcpListeners.Any(x => x.Port == port);
}
private static bool SocketConnected(Socket s)
{
var part1 = s.Poll(1000, SelectMode.SelectRead);
var part2 = s.Available == 0;
if (part1 && part2)
return false;
else
return true;
}
} |
Beta Was this translation helpful? Give feedback.
-
Here is the port forwarded code I used. I had to cleanup the stuff that were not relevant. using k8s;
using Serilog;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace VWAC.VirtualVehicles.Support
{
public class ForwardedPort : IDisposable
{
private const int ReceiveBufferSize = 0xFFFF;
private const int SendBufferSize = 0xFFFF;
private readonly List<TcpConnection> _tcpConnections = new();
private readonly CancellationTokenSource _instanceCts = new CancellationTokenSource();
private TcpListener _tcpListener;
public ForwardedPort(int localPort)
{
LocalPort = localPort;
}
public Guid Id { get; }
public Guid ServiceId { get; }
public Guid VehicleId { get; }
public string TargetNodeName { get; }
public string TargetProtocol { get; }
public int? TargetPort { get; }
public int LocalPort { get; }
public void Initialize()
{
_tcpListener = new TcpListener(IPAddress.Loopback, LocalPort);
_tcpListener.Start();
_ = Task.Factory.StartNew(RunTcpListener);
}
private async Task RunTcpListener()
{
while (!_instanceCts.IsCancellationRequested)
{
TcpClient tcpClient = null;
TcpConnection tcpConnection = null;
try
{
tcpClient = await _tcpListener.AcceptTcpClientAsync();
tcpClient.NoDelay = true;
tcpClient.ReceiveBufferSize = ReceiveBufferSize;
tcpClient.SendBufferSize = SendBufferSize;
tcpConnection = new TcpConnection { TcpClient = tcpClient, PodSocketStream = await GetPodStream(), Stream = tcpClient.GetStream() };
tcpConnection.TunnelReceiveTask = Task.Factory.StartNew(() => RunTcpReceiveTunnel(tcpConnection));
tcpConnection.TunnelSendTask = Task.Factory.StartNew(() => RunTcpSendTunnel(tcpConnection));
lock (_tcpConnections)
_tcpConnections.Add(tcpConnection);
}
catch (Exception ex)
{
// TODO: Setup better handling + logging
if (tcpClient != null)
{
tcpClient.Dispose();
}
if (tcpConnection != null)
tcpConnection.Dispose();
}
}
}
private async Task RunTcpSendTunnel(TcpConnection tcpConnection)
{
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_instanceCts.Token, tcpConnection.ConnectionCancellationToken);
try
{
await RunTcpTunnel(new byte[SendBufferSize], tcpConnection.Stream, tcpConnection.PodSocketStream, cancellationTokenSource.Token);
}
catch
{
tcpConnection.Cancel();
// TODO: Setup better handling + logging and closing of the connection
}
finally
{
tcpConnection.Dispose(); // Only need to dispose once, so the send tunnel does it
}
lock (_tcpConnections)
_tcpConnections.Remove(tcpConnection);
}
private async Task RunTcpReceiveTunnel(TcpConnection tcpConnection)
{
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_instanceCts.Token, tcpConnection.ConnectionCancellationToken);
try
{
await RunTcpTunnel(new byte[ReceiveBufferSize], tcpConnection.PodSocketStream, tcpConnection.Stream, cancellationTokenSource.Token);
}
catch
{
tcpConnection.Cancel();
// TODO: Setup better handling + logging and closing of the connection
}
finally
{
//await tcpConnection.DisposeAsync(); // Only need to dispose once, so the send tunnel does it
}
lock (_tcpConnections)
_tcpConnections.Remove(tcpConnection);
}
private static async Task RunTcpTunnel(byte[] buffer, Stream readStream, Stream writeStream, CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
var readAmount = await readStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken);
if (readAmount <= 0)
{
break;
}
await writeStream.WriteAsync(buffer, 0, readAmount, cancellationToken);
}
}
private async Task<Stream> GetPodStream()
{
var config = KubernetesClientConfiguration.BuildConfigFromConfigFile();
var client = new Kubernetes(config);
var webSocket = await client.WebSocketNamespacedPodPortForwardAsync("vv-4321f07e-124f-4b3f-b8e3-5402074c2c19-0", "virtual-vehicle", new int[] { 22 }, "v4.channel.k8s.io");
var demux = new StreamDemuxer(webSocket, StreamType.PortForward);
demux.Start();
var stream = demux.GetStream((byte?)0, (byte?)0);
return stream;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
// Determine if waiting for tcp listener task to complete is necessary
_tcpListener.Stop();
_instanceCts.Cancel();
TcpConnection[] connections;
lock (_tcpConnections)
connections = _tcpConnections.ToArray();
foreach (var tcpConnection in connections)
tcpConnection.Dispose();
_instanceCts.Dispose();
}
private class TcpConnection : IDisposable
{
private readonly CancellationTokenSource _connectionCts = new();
public CancellationToken ConnectionCancellationToken => _connectionCts.Token;
public TcpClient TcpClient { get; set; }
public Task TunnelReceiveTask { get; set; }
public Task TunnelSendTask { get; set; }
public Stream PodSocketStream { get; set; }
public NetworkStream Stream { get; set; }
public void Cancel()
{
try
{
_connectionCts.Cancel();
}
catch (Exception ex)
{
Log.Error(ex, "Cancel Error");
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
// TODO: Determine if waiting for tunnel receive/send task completion is necessary
try
{
_connectionCts.Dispose();
}
catch (Exception ex)
{
Log.Error(ex, "connectionCts Dispose Error");
}
TcpClient.Close();
TcpClient.Dispose();
Stream.Dispose();
}
}
}
} |
Beta Was this translation helpful? Give feedback.
-
@IvanJosipovic Thank you. I will give it a try. |
Beta Was this translation helpful? Give feedback.
-
@IvanJosipovic Good news!! The code you provided totally works!!! I am wondering what are the changes that make it working? |
Beta Was this translation helpful? Give feedback.
-
thanks all, could you please send a pr of working example code? |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi all, here is an issue I bump into while using the WebSocketNamespacedPodPortForwardAsync() in the client.
My usecase is that I want to create a tunnel between the local port and the ssh port (22) on the pod. I try using the
Kubectl port-forward
command, and it's working just fine. When I try using the port-forward function from the Kubernetes's client, things are not working properly.Here is the ssh debug logging when I try to ssh into the pod:
//
OpenSSH_9.1p1, OpenSSL 1.1.1s 1 Nov 2022
debug1: Reading configuration data /c/Users/LeeTony(USAC-ER)/.ssh/config
debug1: Reading configuration data /etc/ssh/ssh_config
debug1: Connecting to 127.0.0.1 [127.0.0.1] port 22001.
debug1: Connection established.
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_rsa type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_rsa-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ecdsa type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ecdsa-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ecdsa_sk type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ecdsa_sk-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ed25519 type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ed25519-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ed25519_sk type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_ed25519_sk-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_xmss type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_xmss-cert type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_dsa type -1
debug1: identity file /c/Users/LeeTony(USAC-ER)/.ssh/id_dsa-cert type -1
debug1: Local version string SSH-2.0-OpenSSH_9.1
debug1: Remote protocol version 2.0, remote software version OpenSSH_8.9p1 Ubuntu-3ubuntu0.4
debug1: compat_banner: match: OpenSSH_8.9p1 Ubuntu-3ubuntu0.4 pat OpenSSH compat 0x04000000
debug1: Authenticating to 127.0.0.1:22001 as 'tonylee'
debug1: load_hostkeys: fopen /c/Users/LeeTony(USAC-ER)/.ssh/known_hosts2: No such file or directory
debug1: load_hostkeys: fopen /etc/ssh/ssh_known_hosts: No such file or directory
debug1: load_hostkeys: fopen /etc/ssh/ssh_known_hosts2: No such file or directory
debug1: SSH2_MSG_KEXINIT sent
debug1: SSH2_MSG_KEXINIT received
debug1: kex: algorithm: sntrup761x25519-sha512@openssh.com
debug1: kex: host key algorithm: ssh-ed25519
debug1: kex: server->client cipher: chacha20-poly1305@openssh.com MAC: compression: none
debug1: kex: client->server cipher: chacha20-poly1305@openssh.com MAC: compression: none
debug1: expecting SSH2_MSG_KEX_ECDH_REPLY
/*/
It seems like the connection does get established, but it just stuck on waiting for the response of "SSH2_MSG_KEX_ECDH_REPLY". I tried googling the issue and tried what people suggested, but it still didn't work.
I am wondering if anyone has similar issue or knows what kind of issues it is and could point me to a direction.
Thank you all.
Beta Was this translation helpful? Give feedback.
All reactions