RxSwift在地图中可观察到的多个

最后发布: 2019-04-15 18:58:19


问题

我遇到了一种情况,我将获取一个API,该API会生成注册用户的json数据。 然后,我将不得不遍历每个用户,并从远程URL获取他们的化身并将其保存到磁盘。 我可以在subscribe执行第二个任务,但这不是最佳实践。 我正在尝试用mapflatMap等实现它。

这是我的示例代码:

self.dataManager.getUsers()
            .observeOn(MainScheduler.instance)
            .subscribeOn(globalScheduler)
            .map{ [unowned self] (data) -> Users in
                var users = data
// other code for manipulating users goes here
// then below I am trying to use another loop to fetch their avatars

                if let cats = users.categories {
                    for cat in cats  {
                        if let profiles = cat.profiles {
                            for profile in profiles {
                                if let thumbnail = profile.thumbnail,
                                    let url = URL(string: thumbnail) {
                                    URLSession.shared.rx.response(request: URLRequest(url: url))
                                        .subscribeOn(MainScheduler.instance)
                                        .subscribe(onNext: { response in
                                            // Update Image
                                            if let img = UIImage(data: response.data) {
                                                try? Disk.save(img, to: .caches, as: url.lastPathComponent)
                                            }
                                        }, onError: { (error) in

                                        }).disposed(by: self.disposeBag)
                                }
                            }
                        }
                    }
                }

                return users
            }
            .subscribe(onSuccess: { [weak self] (users) in

            }).disposed(by: disposeBag)

这段代码有2个问题。 首先是URLSession上的rx ,它在另一个线程的后台执行任务,并且在此操作完成时无法确认主subscribe 其次是循环和rx,效率不高,因为它应该生成多个可观察对象然后对其进行处理。

任何改进此逻辑的想法都值得欢迎。

rx-swift reactivex
回答

这是一个有趣的难题。

解决此问题的“特殊调味料”在此行中:

.flatMap { 
    Observable.combineLatest($0.map { 
        Observable.combineLatest(
            Observable.just($0.0), 
            URLSession.shared.rx.data(request: $0.1)
                .materialize()
        ) 
    }) 
}

该行之前的map创建一个Observable<[(URL, URLRequest)]> ,相关行将其转换为Observable<[(URL, Event<Data>)]>

该行通过以下方式执行此操作:

  1. 设置网络调用以创建Observable<Data>
  2. 实例化它以创建Observable<Event<Data>> (这样做是为了使一次下载中的错误不会关闭整个流。)
  3. 将URL改回Observable,这为我们提供了Observable<URL>
  4. 合并步骤2和3中的Observable<(URL, Event<Data>)>以产生Observable<(URL, Event<Data>)>
  5. 映射每个数组元素以产生[Observable<(URL, Event<Data>)>]
  6. 合并该数组中的可观察对象,最终产生Observable<[(URL, Event<Data>)]>

这是代码

// manipulatedUsers is for the code you commented out.
// users: Observable<Users>
let users = self.dataManager.getUsers()
    .map(manipulatedUsers) // manipulatedUsers(_ users: Users) -> Users
    .asObservable()
    .share(replay: 1)

// this chain is for handling the users object. You left it blank in your code so I did too.
users
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { users in

    })
    .disposed(by: disposeBag)

// This navigates through the users structure and downloads the images.
// images: Observable<(URL, Event<Data>)>
let images = users.map { $0.categories ?? [] }
    .map { $0.flatMap { $0.profiles ?? [] } }
    .map { $0.compactMap { $0.thumbnail } }
    .map { $0.compactMap { URL(string: $0) } }
    .map { $0.map { ($0, URLRequest(url: $0)) } }
    .flatMap { 
        Observable.combineLatest($0.map { 
            Observable.combineLatest(
                Observable.just($0.0), 
                URLSession.shared.rx.data(request: $0.1)
                    .materialize()
            ) 
        }) 
    }
    .flatMap { Observable.from($0) }
    .share(replay: 1)

// this chain filters out the errors and saves the successful downloads.
images
    .filter { $0.1.element != nil }
    .map { ($0.0, $0.1.element!) }
    .map { ($0.0, UIImage(data: $0.1)!) }
    .observeOn(MainScheduler.instance)
    .bind(onNext: { url, image in
        try? Disk.save(image, to: .caches, as: url.lastPathComponent)
        return // need two lines here because this needs to return Void, not Void?
    })
    .disposed(by: disposeBag)

// this chain handles the download errors if you want to.
images
    .filter { $0.1.error != nil }
    .bind(onNext: { url, error in
        print("failed to download \(url) because of \(error)")
    })
    .disposed(by: disposeBag)