// +build ignore package main import ( "context" "fmt" "log" "time" "github.com/reactivex/rxgo/v2" ) func hello() { observable := rxgo.Just("Hello, World!")() ch := observable.Observe() item := <-ch fmt.Println(item.V) } func hot() { ch := make(chan rxgo.Item) go func() { for i := 0; i < 3; i++ { ch <- rxgo.Of(i) } close(ch) }() observable := rxgo.FromChannel(ch) // First Observer for item := range observable.Observe() { fmt.Println(item.V) } // Second Observer for item := range observable.Observe() { fmt.Println(item.V) } } func cold() { observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) { for i := 0; i < 3; i++ { ch <- rxgo.Of(i) } }}) // First Observer for item := range observable.Observe() { fmt.Println(item.V) } // Second Observer for item := range observable.Observe() { fmt.Println(item.V) } } func connectable() { ch := make(chan rxgo.Item) go func() { for i := 0; i < 100; i++ { ch <- rxgo.Of(i) time.Sleep(time.Second) } close(ch) }() observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy()) // Create the first Observer observable.DoOnNext(func(i interface{}) { fmt.Printf("First observer: %d\n", i) }) // Create the second Observer observable.DoOnNext(func(i interface{}) { fmt.Printf("Second observer: %d\n", i) }) disposed, cancel := observable.Connect(context.Background()) go func() { time.Sleep(3 * time.Second) observable.DoOnNext(func(i interface{}) { fmt.Printf("Third observer: %d\n", i) }) }() /* go func() { // Do something time.Sleep(time.Second) // Then cancel the subscription cancel() }() */ // Wait for the subscription to be disposed _ = disposed _ = cancel } func main() { log.Println("Hello") hello() log.Println("Hot") hot() log.Println("Cold") cold() log.Println("Connectable") connectable() select {} }