File size: 3,411 Bytes
7def60a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//go:build p2p
// +build p2p

package p2p

import (
	"context"
	"errors"
	"fmt"
	"net"
	"time"

	"math/rand/v2"

	"github.com/mudler/edgevpn/pkg/node"
	"github.com/mudler/edgevpn/pkg/protocol"
	"github.com/mudler/edgevpn/pkg/types"
	"github.com/rs/zerolog/log"
)

func (f *FederatedServer) Start(ctx context.Context) error {

	n, err := NewNode(f.p2ptoken)
	if err != nil {
		return fmt.Errorf("creating a new node: %w", err)
	}
	err = n.Start(ctx)
	if err != nil {
		return fmt.Errorf("creating a new node: %w", err)
	}

	if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) {
		log.Debug().Msgf("Discovered node: %s", tunnel.ID)
	}); err != nil {
		return err
	}

	return f.proxy(ctx, n)
}

func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {

	log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr)
	// Open local port for listening
	l, err := net.Listen("tcp", fs.listenAddr)
	if err != nil {
		log.Error().Err(err).Msg("Error listening")
		return err
	}
	//	ll.Info("Binding local port on", srcaddr)

	ledger, _ := node.Ledger()

	// Announce ourselves so nodes accepts our connection
	ledger.Announce(
		ctx,
		10*time.Second,
		func() {
			// Retrieve current ID for ip in the blockchain
			//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
			// If mismatch, update the blockchain
			//if !found {
			updatedMap := map[string]interface{}{}
			updatedMap[node.Host().ID().String()] = &types.User{
				PeerID:    node.Host().ID().String(),
				Timestamp: time.Now().String(),
			}
			ledger.Add(protocol.UsersLedgerKey, updatedMap)
			//	}
		},
	)

	defer l.Close()
	for {
		select {
		case <-ctx.Done():
			return errors.New("context canceled")
		default:
			log.Debug().Msg("New for connection")
			// Listen for an incoming connection.
			conn, err := l.Accept()
			if err != nil {
				fmt.Println("Error accepting: ", err.Error())
				continue
			}

			// Handle connections in a new goroutine, forwarding to the p2p service
			go func() {
				var tunnelAddresses []string
				for _, v := range GetAvailableNodes(fs.service) {
					if v.IsOnline() {
						tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
					} else {
						log.Info().Msgf("Node %s is offline", v.ID)
					}
				}

				if len(tunnelAddresses) == 0 {
					log.Error().Msg("No available nodes yet")
					return
				}

				tunnelAddr := ""

				if fs.loadBalanced {
					for _, t := range tunnelAddresses {
						fs.EnsureRecordExist(t)
					}

					tunnelAddr = fs.SelectLeastUsedServer()
					log.Debug().Msgf("Selected tunnel %s", tunnelAddr)
					if tunnelAddr == "" {
						tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
					}

					fs.RecordRequest(tunnelAddr)
				} else {
					tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
				}

				tunnelConn, err := net.Dial("tcp", tunnelAddr)
				if err != nil {
					log.Error().Err(err).Msg("Error connecting to tunnel")
					return
				}

				log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
				closer := make(chan struct{}, 2)
				go copyStream(closer, tunnelConn, conn)
				go copyStream(closer, conn, tunnelConn)
				<-closer

				tunnelConn.Close()
				conn.Close()
				//	ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
			}()
		}
	}

}