Управление несколькими загрузками с помощью ReactiveX (на iOS с помощью Swift и Alamofire)

Я пытаюсь загрузить несколько фотографий на сервер с помощью ReactiveX (RxSwift), собираю ответы на каждый запрос, а затем делаю один окончательный запрос для завершения отправки.

Кажется, все работает довольно хорошо, пока я не попытаюсь reduce все ответы. Последний subscribeNext никогда не вызывается. (Возможно, я неправильно понимаю, как работает flatMap или reduce?)

В частности, именно так я пытаюсь выполнить эту процедуру.

  • Подготовьте наблюдаемую для кодирования каждой фотографии (self.imageMgr является экземпляром PHCachingImageManager())

    func getPhotoDataObservable(asset: PHAsset) -> Observable<NSData> {
        return create { observer in
            self.imageMgr.requestImageForAsset(asset,
                targetSize: PHImageManagerMaximumSize,
                contentMode: .AspectFit,
                options: nil,
                resultHandler: { (myImage, myInfo) -> Void in
                    let data = UIImageJPEGRepresentation(myImage!, 1.0)!
                    NSLog("Encoded photo")
                    observer.onNext(data)
                    self.converts += 1
                    if self.converts == self.userReview.photos.count {
                        NSLog("Completed encoding photos")
                        observer.onCompleted()
                    }
                })
            return NopDisposable.instance
        }
    }
    
  • Подготовьте наблюдаемую для загрузки каждой фотографии после кодирования (с помощью Alamofire и RxAlamofire).

    func getPostPhotoObservable(photoData: NSData) -> Observable<ReviewPhotoObject> {
        return create { observer in
            NSLog("Uploading Photo")
    
            upload(.POST,
                urlRequest.URLString,
                headers: nil,
                multipartFormData: { mfd in
                    mfd.appendBodyPart(data: photoData, name: "image", fileName: "image", mimeType: "image/jpeg")
                },
                encodingMemoryThreshold: Manager.MultipartFormDataEncodingMemoryThreshold,
                encodingCompletion: { encodingResult in
                    switch encodingResult {
                    case .Success(let upload, _, _):
                        upload.responseJSON(completionHandler: { (myResponse) -> Void in
                            if let photoResponse = myResponse.result.value {
                                let photoObject = photoResponse.objectForKey("photo")!
                                let photo = ReviewPhotoObject()
                                photo.photoID = photoObject.objectForKey("id")! as! NSNumber
                                NSLog("Uploaded Photo")
                                observer.onNext(photo)
                            }
    
                            self.uploads += 1
                            if self.uploads == self.userReview.photos.count {
                                NSLog("Completed uploading photos")
                                observer.onCompleted()
                            }
                        })
    
                    case .Failure(let encodingError):
                        observer.onError(encodingError)
                        print(encodingError)
                    }
                })
    
            return NopDisposable.instance
        }
    }
    
  • Наконец, собрать все вместе

    func postReview(review: MyReview) {
        self.userReview = review
    
        _ = review.photos.toObservable().flatMap { photos in
            return self.getPhotoDataObservable(photos)
        }.flatMap { photoData in 
            return self.getPostPhotoObservable(photoData)
        }.reduce([], { var accumulator, photo: ReviewPhotoObject) -> [Int] in
            accumulator.append(Int(photo.photoID))
            return accumulator
        }).subscribeNext({ (photoIds) -> Void in
            print(photoIds) // Never called
        })
    }
    

При запуске (например, с 2 фотографиями) это вывод:

Encoded photo
Uploading photo
Encoded photo
Uploading photo
Completed encoding photos
Uploaded photo
Uploaded photo
Completed uploading photos

Но subscribeNext никогда не вызывается. Поскольку документация конкретно по RxSwift все еще немного скудна, я надеялся, что кто-нибудь здесь сможет подсказать мне, что я неправильно понимаю.


person Matt Pinkston    schedule 14.12.2015    source источник
comment
Я не понял, как работают наблюдаемые. В этом случае мне следовало позвонить onComplete() сразу после onNext(). Я оставлю этот вопрос на некоторое время без ответа, чтобы дать более опытному реактивному программисту возможность объяснить лучше, чем я сейчас. (Я отвечу позже после небольшого изучения)   -  person Matt Pinkston    schedule 14.12.2015


Ответы (1)


Идея здесь в том, что как только наблюдаемая отправит все элементы, которые она собирается отправить, она должна завершиться. Вы создаете наблюдаемую для каждого PHAsset, и эта наблюдаемая отправляет только один элемент, поэтому он должен завершиться после этого. В том виде, в котором у вас был код, завершался только последний, поэтому оператор reduce просто сидел и ждал, пока завершатся остальные, прежде чем он сможет завершить свою работу.

Вот как я бы написал первую функцию (в Swift 3 вместо 2).

extension PHImageManager {

    func requestMaximumSizeImage(for asset: PHAsset) -> Observable<UIImage> {
        return .create { observer in
            let request = self.requestImage(for: asset, targetSize: PHImageManagerMaximumSize, contentMode: .aspectFit, options: nil, resultHandler: { image, info in
                if let image = image {
                    observer.onNext(image)
                    observer.onCompleted()
                }
                else if let info = info, let error = info[PHImageErrorKey] as? Error {
                    observer.onError(error)
                }
            })
            return Disposables.create { self.cancelImageRequest(request) }
        }
    }
}

Вы увидите, что я сделал бы это расширением PHImageManager вместо бесплатной функции, но это всего лишь разница в стиле. Функциональные различия заключаются в том, что мой код выдаст ошибку, если базовый запрос выдаст ошибку, и отменит запрос, если все подписчики откажутся до завершения запроса. Кроме того, он не выполняет преобразование JPEG. Сохраняйте эти операции небольшими и выполняйте преобразование JPEG внутри карты следующим образом:

    let imagesData = review.photos.toObservable().flatMap {
        self.imageMgr.requestMaximumSizeImage(for: $0)
    }.map {
        UIImageJPEGRepresentation($0, 1.0)
    }.filter { $0 != nil }.map { $0! }

Приведенный выше код запрашивает изображения у менеджера, затем преобразует их в данные JPEG, отфильтровывая все, что не удалось преобразовать. imagesData относится к типу Observable<Data>.

Ваш getPostPhotoObservable в порядке, за исключением завершенной проблемы и того факта, что он не обрабатывает отмену в одноразовом выпуске. Кроме того, вы можете просто заставить свою функцию публикации возвращать Observable вместо того, чтобы оборачивать результат в ReviewPhotoObject.

Другие предупреждения:

  1. То, как вы собираете все это вместе, не гарантирует, что ReviewPhotoObjects будут в том же порядке, что и фотографии (потому что вы не можете гарантировать порядок завершения загрузки). Чтобы исправить это, при необходимости, вам нужно будет использовать concat вместо flatMap.

  2. Если какая-либо загрузка завершится неудачно, весь конвейер отключится и прервет все последующие загрузки. Вероятно, вам следует настроить что-то, чтобы ловить ошибки и делать что-то соответствующее. Либо catchErrorJustReturn, либо catchError в зависимости от ваших требований.

person Daniel T.    schedule 19.04.2017
comment
Спасибо за продуманный ответ! Это определенно решает исходный вопрос. (Вы также правы насчет функциональных особенностей этого кода.) - person Matt Pinkston; 19.04.2017