首页 > 代码库 > [RxJS] Subject: an Observable and Observer hybrid

[RxJS] Subject: an Observable and Observer hybrid

This lesson teaches you how a Subject is simply a hybrid of Observable and Observer which can act as a bridge between the source Observable and multiple observers, effectively making it possible for multiple observers to share the same Observable execution.

 

var observable = Rx.Observable.interval(1000).take(5);var observerA = {  next: function (x) { console.log(A next  + x); },  error: function (err) { console.log(A error  + err); },  complete: function () { console.log(A done); },};var observerB = {  next: function (x) { console.log(B next  + x); },  error: function (err) { console.log(B error  + err); },  complete: function () { console.log(B done); },};observable.subscribe(observerA);setTimeout(  () => {    observable.subscribe(observerB);  },2000)

In the code above, we have two ‘observers‘, because we call subscribe twice:

observable.scbscribe(ObserverA);observable.scbscribe(ObserverB);

 

If we want to have one observer, so we need to call subscribe only once.

For that we can build a bridgeObservers, which will loop though the observers:

const observable = Rx.Observable.interval(1000).take(5);const ObserverA = {  next: function(x){    console.log("A next " + x)  },  error: function(x){    console.error("A error " + x)  },  complete: function(){    console.log("A Done")  },};const ObserverB = {  next: function(x){    console.log("B next " + x)  },  error: function(x){    console.error("B error " + x)  },  complete: function(){    console.log("B Done")  },};const BridgeObservers = {  next: function(x){    this.observers.forEach(      o => o.next(x)    )  },  error: function(x){    this.observers.forEach(      o => o.error(x)    )  },  complete: function(){    this.observers.forEach(      o => o.complete()    )  },  observers: [],  addObserver: function(observer){    this.observers.push(observer)  }};observable.subscribe(BridgeObservers);BridgeObservers.addObserver(ObserverA);setTimeout(function(){  BridgeObservers.addObserver(ObserverB);}, 2000)

 

And this partten:

observable.subscribe(BridgeObservers);BridgeObservers.addObserver(ObserverA); // BirdegeObservers.subscribe(ObserverA)

is actually ‘subject‘ partten, works both as Observer and Observable.

 

Subject:

const observable = Rx.Observable.interval(1000).take(5);const ObserverA = {  next: function(x){    console.log("A next " + x)  },  error: function(x){    console.error("A error " + x)  },  complete: function(){    console.log("A Done")  },};const ObserverB = {  next: function(x){    console.log("B next " + x)  },  error: function(x){    console.error("B error " + x)  },  complete: function(){    console.log("B Done")  },};const subject = new Rx.Subject();/*const BridgeObservers = {  next: function(x){    this.observers.forEach(      o => o.next(x)    )  },  error: function(x){    this.observers.forEach(      o => o.error(x)    )  },  complete: function(){    this.observers.forEach(      o => o.complete()    )  },  observers: [],  subscribe: function(observer){    this.observers.push(observer)  }};*/observable.subscribe(subject);subject.subscribe(ObserverA);//BridgeObservers.subscribe(ObserverA);setTimeout(function(){  subject.subscribe(ObserverB); // BridgeObservers.subscribe(ObserverB);}, 2000)

 In the end, ObserverA and ObserverB share one single observer. 

[RxJS] Subject: an Observable and Observer hybrid