From 92cd3ebffe7f1c8517ff0509af812db1529ddc5a Mon Sep 17 00:00:00 2001 From: Gregory Bednov Date: Thu, 17 Jul 2025 13:42:18 +0300 Subject: [PATCH] still testing yggdrasil --- cli/testyggdrasil.go | 108 +++++++++++++++++++++++++++++++++++++-- yggdrasil/autopeering.go | 7 ++- yggdrasil/main.go | 10 ++-- 3 files changed, 113 insertions(+), 12 deletions(-) diff --git a/cli/testyggdrasil.go b/cli/testyggdrasil.go index 0f9b0cb..c549d31 100644 --- a/cli/testyggdrasil.go +++ b/cli/testyggdrasil.go @@ -1,9 +1,19 @@ package cli import ( + "context" "fmt" + "io" "lbc/cfg" "lbc/yggdrasil" + "log" + "net" + "net/url" + "os" + "os/signal" + "strings" + "syscall" + "time" "github.com/spf13/cobra" ) @@ -14,16 +24,106 @@ var testYggdrasilCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { v, err := cfg.LoadViperConfig(defaultConfigPath) if err != nil { - return fmt.Errorf("не удалось прочитать конфигурацию: %w", err) + fmt.Fprintf(os.Stderr, "не удалось прочитать конфигурацию viper: %v", err) + os.Exit(1) } - if err := yggdrasil.TestConnectivity(v); err != nil { - return fmt.Errorf("тест не пройден: %w", err) + config, err := cfg.ReadConfig(defaultConfigPath) + if err != nil { + fmt.Fprintf(os.Stderr, "конфигурация не прочитана: %v", err) } - fmt.Println("Yggdrasil connectivity test successful") + ctx, cancel := context.WithCancel(context.Background()) + laddrReturner := make(chan string, 2) + go yggdrasil.Yggdrasil(v, laddrReturner) + go notblockchain(ctx, dbPath, config, laddrReturner) + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh // ждём SIGINT/SIGTERM + cancel() // говорим горутинам завершаться return nil }, } +func notblockchain(ctx context.Context, dbPath string, config *cfg.Config, laddrReturner chan string) error { + isGenesis := false + laddr := "tcp://" + <-laddrReturner + p2peers := <-laddrReturner + + u, err := url.Parse(laddr) + if err != nil { + log.Fatalf("bad laddr %q: %v", laddr, err) + } + proto := u.Scheme // "tcp" + listenAddr := u.Host // "127.0.0.1:8000" + ln, err := net.Listen(proto, listenAddr) + if err != nil { + log.Fatalf("listen %s failed: %v", listenAddr, err) + } + log.Printf("listening on %s://%s", proto, listenAddr) + + // --- парсим пиров --- + var peerAddrs []string + for _, entry := range strings.Split(p2peers, ",") { + parts := strings.SplitN(entry, "@", 2) + if len(parts) != 2 { + log.Printf("skip invalid peer entry: %q", entry) + continue + } + peerAddrs = append(peerAddrs, parts[1]) + } + if len(peerAddrs) == 0 { + log.Fatalf("нет ни одного валидного пира в %q", p2peers) + } + + // --- стартовое сообщение от не-genesis --- + if !isGenesis { + initial := []byte("HELLO_FROM_JOINER\n") + for _, pa := range peerAddrs { + go sendToPeer(proto, pa, initial) + } + } + + // --- главный цикл приёма+форвард--- + for { + conn, err := ln.Accept() + if err != nil { + log.Printf("accept error: %v", err) + continue + } + go func(c net.Conn) { + defer c.Close() + data, err := io.ReadAll(c) + if err != nil { + log.Printf("read error: %v", err) + return + } + + log.Printf("received %d bytes, waiting 10s before forwarding", len(data)) + time.Sleep(10 * time.Second) + + for _, pa := range peerAddrs { + go sendToPeer(proto, pa, data) + } + }(conn) + } +} + +// sendToPeer коннектится к одному пиру и шлёт данные. +func sendToPeer(proto, addr string, data []byte) { + conn, err := net.Dial(proto, addr) + if err != nil { + log.Printf("dial %s://%s error: %v", proto, addr, err) + return + } + defer conn.Close() + + if _, err := conn.Write(data); err != nil { + log.Printf("write to %s error: %v", addr, err) + } else { + log.Printf("forwarded %d bytes to %s", len(data), addr) + } +} + func init() { rootCmd.AddCommand(testYggdrasilCmd) } diff --git a/yggdrasil/autopeering.go b/yggdrasil/autopeering.go index 0b9ec1c..7b99dc7 100644 --- a/yggdrasil/autopeering.go +++ b/yggdrasil/autopeering.go @@ -3,6 +3,7 @@ package yggdrasil import ( "context" "io/fs" + "log" "math/rand" "net" "net/url" @@ -64,9 +65,10 @@ func getPublicPeers() []url.URL { } var peers []url.URL - re := regexp.MustCompile(`(?m)^\s*(tcp|tls)://[^\s]+`) + re := regexp.MustCompile(`(?m)(tcp|tls)://[^\s` + "`" + `]+`) filepath.WalkDir(tempDir, func(path string, d fs.DirEntry, err error) error { if err != nil { + log.Printf("walk error: %v", err) return nil } if d.IsDir() || !strings.HasSuffix(d.Name(), ".md") || d.Name() == "README.md" { @@ -77,7 +79,8 @@ func getPublicPeers() []url.URL { return nil } for _, m := range re.FindAllStringSubmatch(string(data), -1) { - url, err := url.Parse(strings.TrimSpace(m[1])) + urlStr := strings.TrimSpace(m[0]) + url, err := url.Parse(urlStr) if err != nil { panic(err) } diff --git a/yggdrasil/main.go b/yggdrasil/main.go index 3836c4f..c298665 100644 --- a/yggdrasil/main.go +++ b/yggdrasil/main.go @@ -82,12 +82,12 @@ func Yggdrasil(config *viper.Viper, ch chan string) { cfg.AdminListen = ygg.GetString("admin_listen") cfg.Listen = ygg.GetStringSlice("listen") if ygg.GetString("peers") == "auto" { - publicPeers := getPublicPeers() - var urlsAsStrings []string + publicPeers := RandomPick(GetClosestPeers(getPublicPeers(), 20), 3) + var urls []string for _, u := range publicPeers { - urlsAsStrings = append(urlsAsStrings, u.String()) + urls = append(urls, u.String()) } - cfg.Peers = urlsAsStrings + cfg.Peers = urls } else { cfg.Peers = ygg.GetStringSlice("peers") } @@ -284,5 +284,3 @@ func Yggdrasil(config *viper.Viper, ch chan string) { } n.core.Stop() } - -