|
|
|
|
|
|
|
package p2p |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"net" |
|
"os" |
|
"sync" |
|
"time" |
|
|
|
"github.com/ipfs/go-log" |
|
"github.com/libp2p/go-libp2p/core/peer" |
|
"github.com/mudler/LocalAI/pkg/utils" |
|
"github.com/mudler/edgevpn/pkg/config" |
|
"github.com/mudler/edgevpn/pkg/node" |
|
"github.com/mudler/edgevpn/pkg/protocol" |
|
"github.com/mudler/edgevpn/pkg/services" |
|
"github.com/mudler/edgevpn/pkg/types" |
|
"github.com/phayes/freeport" |
|
zlog "github.com/rs/zerolog/log" |
|
|
|
"github.com/mudler/edgevpn/pkg/logger" |
|
) |
|
|
|
func GenerateToken() string { |
|
|
|
newData := node.GenerateNewConnectionData(900) |
|
return newData.Base64() |
|
} |
|
|
|
func IsP2PEnabled() bool { |
|
return true |
|
} |
|
|
|
func nodeID(s string) string { |
|
hostname, _ := os.Hostname() |
|
return fmt.Sprintf("%s-%s", hostname, s) |
|
} |
|
|
|
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error { |
|
|
|
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) |
|
|
|
l, err := net.Listen("tcp", listenAddr) |
|
if err != nil { |
|
zlog.Error().Err(err).Msg("Error listening") |
|
return err |
|
} |
|
|
|
|
|
ledger, _ := node.Ledger() |
|
|
|
|
|
ledger.Announce( |
|
ctx, |
|
10*time.Second, |
|
func() { |
|
|
|
|
|
|
|
|
|
updatedMap := map[string]interface{}{} |
|
updatedMap[node.Host().ID().String()] = &types.User{ |
|
PeerID: node.Host().ID().String(), |
|
Timestamp: time.Now().String(), |
|
} |
|
ledger.Add(protocol.UsersLedgerKey, updatedMap) |
|
|
|
}, |
|
) |
|
|
|
defer l.Close() |
|
for { |
|
select { |
|
case <-ctx.Done(): |
|
return errors.New("context canceled") |
|
default: |
|
zlog.Debug().Msg("New for connection") |
|
|
|
conn, err := l.Accept() |
|
if err != nil { |
|
fmt.Println("Error accepting: ", err.Error()) |
|
continue |
|
} |
|
|
|
|
|
go func() { |
|
|
|
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, service) |
|
service := &types.Service{} |
|
existingValue.Unmarshal(service) |
|
|
|
if !found { |
|
zlog.Error().Msg("Service not found on blockchain") |
|
conn.Close() |
|
|
|
return |
|
} |
|
|
|
|
|
d, err := peer.Decode(service.PeerID) |
|
if err != nil { |
|
zlog.Error().Msg("cannot decode peer") |
|
|
|
conn.Close() |
|
|
|
return |
|
} |
|
|
|
|
|
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID()) |
|
if err != nil { |
|
zlog.Error().Msg("cannot open stream peer") |
|
|
|
conn.Close() |
|
|
|
return |
|
} |
|
|
|
zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String()) |
|
closer := make(chan struct{}, 2) |
|
go copyStream(closer, stream, conn) |
|
go copyStream(closer, conn, stream) |
|
<-closer |
|
|
|
stream.Close() |
|
conn.Close() |
|
|
|
}() |
|
} |
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error { |
|
if servicesID == "" { |
|
servicesID = defaultServicesID |
|
} |
|
tunnels, err := discoveryTunnels(ctx, n, token, servicesID) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
|
|
|
|
|
|
|
|
go func() { |
|
for { |
|
select { |
|
case <-ctx.Done(): |
|
zlog.Error().Msg("Discoverer stopped") |
|
return |
|
case tunnel := <-tunnels: |
|
AddNode(servicesID, tunnel) |
|
if discoveryFunc != nil { |
|
discoveryFunc(servicesID, tunnel) |
|
} |
|
} |
|
} |
|
}() |
|
|
|
return nil |
|
} |
|
|
|
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string) (chan NodeData, error) { |
|
tunnels := make(chan NodeData) |
|
|
|
err := n.Start(ctx) |
|
if err != nil { |
|
return nil, fmt.Errorf("creating a new node: %w", err) |
|
} |
|
ledger, err := n.Ledger() |
|
if err != nil { |
|
return nil, fmt.Errorf("creating a new node: %w", err) |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
go func() { |
|
for { |
|
select { |
|
case <-ctx.Done(): |
|
zlog.Error().Msg("Discoverer stopped") |
|
return |
|
default: |
|
time.Sleep(5 * time.Second) |
|
zlog.Debug().Msg("Searching for workers") |
|
|
|
data := ledger.LastBlock().Storage[servicesID] |
|
for k, v := range data { |
|
zlog.Info().Msgf("Found worker %s", k) |
|
nd := &NodeData{} |
|
if err := v.Unmarshal(nd); err != nil { |
|
zlog.Error().Msg("cannot unmarshal node data") |
|
continue |
|
} |
|
ensureService(ctx, n, nd, k) |
|
muservice.Lock() |
|
if _, ok := service[nd.Name]; ok { |
|
tunnels <- service[nd.Name].NodeData |
|
} |
|
muservice.Unlock() |
|
} |
|
} |
|
} |
|
}() |
|
|
|
return tunnels, err |
|
} |
|
|
|
type nodeServiceData struct { |
|
NodeData NodeData |
|
CancelFunc context.CancelFunc |
|
} |
|
|
|
var service = map[string]nodeServiceData{} |
|
var muservice sync.Mutex |
|
|
|
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string) { |
|
muservice.Lock() |
|
defer muservice.Unlock() |
|
if ndService, found := service[nd.Name]; !found { |
|
if !nd.IsOnline() { |
|
|
|
return |
|
} |
|
newCtxm, cancel := context.WithCancel(ctx) |
|
|
|
port, err := freeport.GetFreePort() |
|
if err != nil { |
|
fmt.Print(err) |
|
} |
|
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port) |
|
nd.TunnelAddress = tunnelAddress |
|
service[nd.Name] = nodeServiceData{ |
|
NodeData: *nd, |
|
CancelFunc: cancel, |
|
} |
|
go allocateLocalService(newCtxm, n, tunnelAddress, sserv) |
|
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress) |
|
} else { |
|
|
|
|
|
if !nd.IsOnline() && !ndService.NodeData.IsOnline() { |
|
ndService.CancelFunc() |
|
delete(service, nd.Name) |
|
zlog.Info().Msgf("Node %s is offline, deleting", nd.ID) |
|
} else if nd.IsOnline() { |
|
|
|
nd.TunnelAddress = ndService.NodeData.TunnelAddress |
|
service[nd.Name] = nodeServiceData{ |
|
NodeData: *nd, |
|
CancelFunc: ndService.CancelFunc, |
|
} |
|
zlog.Debug().Msgf("Node %s is still online", nd.ID) |
|
} |
|
} |
|
} |
|
|
|
|
|
func ExposeService(ctx context.Context, host, port, token, servicesID string) error { |
|
if servicesID == "" { |
|
servicesID = defaultServicesID |
|
} |
|
llger := logger.New(log.LevelFatal) |
|
|
|
nodeOpts, err := newNodeOpts(token) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
name := utils.RandString(10) |
|
|
|
|
|
nodeOpts = append(nodeOpts, |
|
services.RegisterService(llger, time.Duration(60)*time.Second, name, fmt.Sprintf("%s:%s", host, port))...) |
|
n, err := node.New(nodeOpts...) |
|
if err != nil { |
|
return fmt.Errorf("creating a new node: %w", err) |
|
} |
|
|
|
err = n.Start(ctx) |
|
if err != nil { |
|
return fmt.Errorf("creating a new node: %w", err) |
|
} |
|
|
|
ledger, err := n.Ledger() |
|
if err != nil { |
|
return fmt.Errorf("creating a new node: %w", err) |
|
} |
|
|
|
ledger.Announce( |
|
ctx, |
|
20*time.Second, |
|
func() { |
|
|
|
|
|
|
|
|
|
updatedMap := map[string]interface{}{} |
|
updatedMap[name] = &NodeData{ |
|
Name: name, |
|
LastSeen: time.Now(), |
|
ID: nodeID(name), |
|
} |
|
ledger.Add(servicesID, updatedMap) |
|
|
|
}, |
|
) |
|
|
|
return err |
|
} |
|
|
|
func NewNode(token string) (*node.Node, error) { |
|
nodeOpts, err := newNodeOpts(token) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
n, err := node.New(nodeOpts...) |
|
if err != nil { |
|
return nil, fmt.Errorf("creating a new node: %w", err) |
|
} |
|
|
|
return n, nil |
|
} |
|
|
|
func newNodeOpts(token string) ([]node.Option, error) { |
|
llger := logger.New(log.LevelFatal) |
|
defaultInterval := 10 * time.Second |
|
|
|
|
|
noDHT := os.Getenv("LOCALAI_P2P_DISABLE_DHT") == "true" |
|
noLimits := os.Getenv("LOCALAI_P2P_DISABLE_LIMITS") == "true" |
|
|
|
loglevel := "info" |
|
|
|
c := config.Config{ |
|
Limit: config.ResourceLimit{ |
|
Enable: !noLimits, |
|
MaxConns: 100, |
|
}, |
|
NetworkToken: token, |
|
LowProfile: false, |
|
LogLevel: loglevel, |
|
Libp2pLogLevel: "fatal", |
|
Ledger: config.Ledger{ |
|
SyncInterval: defaultInterval, |
|
AnnounceInterval: defaultInterval, |
|
}, |
|
NAT: config.NAT{ |
|
Service: true, |
|
Map: true, |
|
RateLimit: true, |
|
RateLimitGlobal: 10, |
|
RateLimitPeer: 10, |
|
RateLimitInterval: defaultInterval, |
|
}, |
|
Discovery: config.Discovery{ |
|
DHT: noDHT, |
|
MDNS: true, |
|
Interval: 30 * time.Second, |
|
}, |
|
Connection: config.Connection{ |
|
HolePunch: true, |
|
AutoRelay: true, |
|
MaxConnections: 100, |
|
}, |
|
} |
|
|
|
nodeOpts, _, err := c.ToOpts(llger) |
|
if err != nil { |
|
return nil, fmt.Errorf("parsing options: %w", err) |
|
} |
|
|
|
nodeOpts = append(nodeOpts, services.Alive(30*time.Second, 900*time.Second, 15*time.Minute)...) |
|
|
|
return nodeOpts, nil |
|
} |
|
|
|
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { |
|
defer func() { closer <- struct{}{} }() |
|
io.Copy(dst, src) |
|
} |
|
|