一、常用的RxSwift高阶函数
1、just
// 生成一个序列,然后订阅
func test4() {
let dic: [String: Any] = ["name": "Beckhams", "age": 18]
let dicObservable = Observable<[String: Any]>.just(dic)
.subscribe(onNext: { dicData in
print(dicData)
}) { error in
print("error")
} onCompleted: {
print("complete")
}
}
2、drive
func testDrive() {
// driver 必须要在主线程调用的,故常用来绑定刷新UI
let result = inputTF.rx.text.orEmpty
.asDriver()
.flatMap {
return self.dealwithData(inputText: $0)
.asDriver(onErrorJustReturn: "检测到了错误事件")
}
// 请求一次网络
// 绑定到了UI - 主线程
// titlt - 非error
result.map { "长度: \(($0 as! String).count)"}.drive(self.textLabel.rx.text)
result.map { "\($0 as! String)"}
.drive(self.btn.rx.title())
}
func dealwithData(inputText:String)-> Observable<Any>{
print("请求网络了 \(Thread.current)") // data
return Observable<Any>.create({ (ob) -> Disposable in
if inputText == "1234" {
ob.onError(NSError.init(domain: "com.lgcooci.cn", code: 10086, userInfo: nil))
}// json - model username pasword
// username passopp - lgError - 8001 - httpCoder 100 - 500
// 封装 vm - error -
DispatchQueue.global().async {
print("发送之前看看: \(Thread.current)")
ob.onNext("已经输入:\(inputText)")
ob.onCompleted()
}
return Disposables.create()
})
}
3、combineLatest 序列组合
func testCombineLatest() {
// 2个序列组合
let phone = PublishSubject<Bool>()
let password = PublishSubject<Bool>()
Observable.combineLatest(phone, password).subscribe(onNext: { (b1,b2) in
if b1 && b2 {
print("账号和密码都合乎规则,可以点击登录了")
}else {
print("账号和密码有一项不和规则")
}
}).disposed(by: disposeBag)
phone.onNext(true)
password.onNext(false)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
password.onNext(true)
}
}
4、map 序列映射
// map: 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。
func testMap() {
print("*****map*****")
let ob = Observable.of(1,2,3,4)
ob.map { (number) -> Int in
return number+2
}
.subscribe{
print("\($0)")
}
.disposed(by: disposeBag)
}
5、flatMap与flatMapListest
func testFlatMap() {
// flatMap and flatMapLatest
// 将可观测序列发射的元素转换为可观测序列,并将两个可观测序列的发射合并为一个可观测序列。
// 这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,你想能够对任何一个可观察序列的新发射做出反应(序列中序列:比如网络序列中还有模型序列)
// flatMap和flatMapLatest的区别是,flatMapLatest只会从最近的内部可观测序列发射元素
print("*****flatMap*****")
let boy = LGPlayer(score: 100)
let girl = LGPlayer(score: 90)
let player = BehaviorSubject(value: boy)
player.asObservable()
.flatMap { $0.score.asObservable() } // 本身score就是序列 模型就是序列中的序列
.subscribe(onNext: { print(":\($0)") })
.disposed(by: disposeBag)
boy.score.onNext(60)
player.onNext(girl)
boy.score.onNext(50)
boy.score.onNext(40)// 如果切换到 flatMapLatest 就不会打印
girl.score.onNext(10)
girl.score.onNext(0)
let player1 = BehaviorSubject(value: boy)
player1.asObserver().map({ playvalue in
print(playvalue)
return playvalue.age
}).subscribe(onNext: { print($0)})
// flatMapLatest实际上是map和switchLatest操作符的组合。
}
6、takeWhile与takeUntil、skipUntil
take:只从一个可观察序列的开始发出指定数量的元素。
takeWhile: 只要指定条件的值为true,就从可观察序列的开始发出元素
takeUntil:条件成立就结束序列。当前序列会发送元素,知道另一个序列发送了,当前序列就结束
skipUntil:抑制从源可观察序列发出元素,直到参考可观察序列发出元素。比如另一个序列发送了,当前序列才发送,否则跳过
func testTake() {
print("*****take*****")
Observable.of("Hank", "Kody","Cooci", "CC")
.take(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
}
// 结果:Hank, Kody
func testTakeWhile() {
print("*****takeWhile*****")
Observable.of(1, 2, 3, 4, 5, 6)
.takeWhile { $0 < 3 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//结果:1,2
// 此时和filter一样的效果
func takeUntil() {
print("*****takeUntil*****")
let sourceSeq = PublishSubject<String>()
let referenceSeq = PublishSubject<String>()
sourceSeq
.takeUntil(referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
sourceSeq.onNext("Jim")
sourceSeq.onNext("Lili")
sourceSeq.onNext("CC")
referenceSeq.onNext("Lucy") // 条件一出来,下面不走了
sourceSeq.onNext("Lina")
sourceSeq.onNext("beckhams")
sourceSeq.onNext("wawa")
}
// 结果就是Jim、Lili、CC
func skipUntil() {
print("*****skipUntil*****")
let sourceSeq = PublishSubject<String>()
let referenceSeq = PublishSubject<String>()
sourceSeq
.takeUntil(referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//当前都会被跳过
sourceSeq.onNext("Jim")
sourceSeq.onNext("Lili")
sourceSeq.onNext("CC")
referenceSeq.onNext("Lucy") // 条件一出来,下面才走了
sourceSeq.onNext("Lina")
sourceSeq.onNext("beckhams")
sourceSeq.onNext("wawa")
}
// 结果就是Lina、beckhams、wawa
// takeUntil和skipUntil 刚好相反
7、filter
filter: 过滤条件,仅从满足指定条件的可观察序列中发出那些元素
func testFilter() {
print("*****filter*****")
Observable.of(1,2,3,4,5,6,7,8,9,0)
.filter { $0 % 2 == 0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
// 结果就是2、4、6、8、0
8、retry
retry: 通过无限地重新订阅可观察序列来恢复重复的错误事件
func testRetry() {
print("*****retry*****")
var count = 1 // 外界变量控制流程
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("Hank")
observer.onNext("Kody")
observer.onNext("CC")
if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
observer.onError(self.lgError) // 接收到了错误序列,重试序列发生
print("错误序列来了")
count += 1
}
observer.onNext("Lina")
observer.onNext("小雁子")
observer.onNext("婷婷")
observer.onCompleted()
return Disposables.create()
}
sequenceRetryErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
9、share与whileConnected
func testShare() {
// 有多个订阅者的时候,共享同一份结果。比如下面例子中的2个subscribe,只会打印一次 “我开始网络请求了” 。replay参数表示缓存几个onNext。scope参数中 whileConnected:每个connection 都有单独的一个Subject存储事件Event;forever:用一个Subject存储所有的connections的事件Event
let net = Observable<String>.create { (ob) -> Disposable in
print("我开始网络请求了")
ob.onNext("请求结果1")
//ob.onNext("请求结果2")
ob.onCompleted()
return Disposables.create {
print("销毁了")
}
}.share(replay: 1, scope: .forever)
// }.share(replay: 2, scope: .forever)
// }.share(replay: 2, scope: .whileConnected)
net.subscribe(onNext:{
print("第一次订阅:\($0)",Thread.current)
})
.disposed(by: disposeBag)
net.subscribe(onNext:{
print("第二次订阅:\($0)",Thread.current)
}).disposed(by: disposeBag)
net.subscribe(onNext:{
print("第三次订阅:\($0)",Thread.current)
}).disposed(by: disposeBag)
}
10、PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject
func testSubject() {
// 在RxsWift中还有一种非常特殊的序列 既是观察者也是响应者
// Subject是一个代理,它既是Observer,也是Observable
// PublishSubject
// 1:初始化序列
let publishSub = PublishSubject<Int>() //初始化一个PublishSubject 装着Int类型的序列
// 2:发送响应序列
publishSub.onNext(1)
// 3:订阅序列
publishSub.subscribe { print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
/* 结果是: 订阅到了: next(2)
订阅到了: next(3)
*/
print("**********BehaviorSubject**********")
// BehaviorSubject
// 1:创建序列
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:发送信号
behaviorSub.onNext(2)
behaviorSub.onNext(3)
// 3:订阅序列
behaviorSub.subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
behaviorSub.onNext(4)
behaviorSub.onNext(5)
// 再次订阅
behaviorSub.subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
/* 结果是: 订阅到了: next(3)
订阅到了: next(4)
订阅到了: next(5)
订阅到了: next(5)
*/
print("**********ReplaySubject**********")
// ReplaySubject
// 1:创建序列
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// let replaySub = ReplaySubject<Int>.createUnbounded()
// 2:发送信号
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)
// 3:订阅序列
replaySub.subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)
/* 结果是: 订阅到了: next(3)
订阅到了: next(4)
订阅到了: next(7)
订阅到了: next(8)
订阅到了: next(9)
*/
print("**********AsyncSubject**********")
// AsyncSubject
// 1:创建序列
let asynSub = AsyncSubject<Int>.init()
// 2:发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 3:订阅序列
asynSub.subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
asynSub.onError(NSError.init(domain: "lgcooci", code: 10086, userInfo: nil))
asynSub.onCompleted()
/** 结果是:订阅到了: error(Error Domain=lgcooci Code=10086 "(null)") **/
print("**********Variable**********")
// Variable : 5.0已经废弃(BehaviorSubject 替换) - 这里板书 大家可以了解一下
// 1:创建序列
let variableSub = Variable.init(1)
// 2:发送信号
variableSub.value = 100
variableSub.value = 10
// 3:订阅信号
variableSub.asObservable().subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
variableSub.value = 1000
}