// +build ignore package main import ( "bufio" "context" "fmt" "io" "log" "net" "time" "github.com/reactivex/rxgo/v2" ) func main() { ln, err := net.Listen("tcp", "0.0.0.0:8080") if err != nil { log.Fatalln(err) } fmt.Println("listening on", ln.Addr()) for id, observable := 0, generator(); ; id++ { if conn, err := ln.Accept(); err == nil { fmt.Printf("client[%d] connected from %s\n", id, conn.RemoteAddr()) go readclose(conn, id) go subscribe(conn, observable) } } } func readclose(conn io.ReadCloser, id int) { defer func() { fmt.Printf("client[%d] disconnected: read: EOF\n", id) conn.Close() }() scanner := bufio.NewScanner(conn) for scanner.Scan() { line := scanner.Text() fmt.Printf("client[%d]: %s\n", id, line) } } func subscribe(conn io.Writer, observable rxgo.Observable) { observable.DoOnNext(func(i interface{}) { fmt.Fprintf(conn, "%v\n", i) }) } func generator() rxgo.Observable { ch := make(chan rxgo.Item) go func() { for { ch <- rxgo.Of(time.Now()) time.Sleep(time.Second) } close(ch) }() observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy()) observable.Connect(context.Background()) return observable }