Фреймворк Apple Combine: как запустить несколько издателей параллельно и дождаться их завершения?

Я открываю для себя Combine. Я написал методы, которые «комбинируют» HTTP-запросы, например:

func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
    var request = URLRequest(url: url,
                             cachePolicy: .useProtocolCachePolicy,
                             timeoutInterval: 15)
    request.httpMethod = "GET"

    return urlSession.dataTaskPublisher(for: request)
        .tryMap {
            return $0.data
        }
        .eraseToAnyPublisher()
}

Я бы хотел вызвать метод несколько раз и все-таки выполнить задачу, например:

let myURLs: [URL] = ...

for url in myURLs {
    let cancellable = testRawDataTaskPublisher(for: url)
        .sink(receiveCompletion: { _ in }) { data in
            // save the data...
        }
}

Приведенный выше код не будет работать, потому что мне нужно сохранить отменяемое значение в переменной, принадлежащей классу. Первый вопрос: это хорошая идея хранить много (например, 1000) отменяемых файлов в чем-то вроде _3 _ ??? Не вызовет ли это утечки памяти?

var cancellables = Set<AnyCancellable>()

...

    let cancellable = ...

    cancellables.insert(cancellable) // ???

И второй вопрос: как запустить задачу, когда все отменяемые завершены? Я думал о чем-то подобном

class Test {
    var cancellables = Set<AnyCancellable>()

    func run() {
        // show a loader

        let cancellable = runDownloads()
            .receive(on: RunLoop.main)
            .sink(receiveCompletion: { _ in }) { _ in
                // hide the loader
            }

        cancellables.insert(cancellable)
    }

    func runDownloads() -> AnyPublisher<Bool, Error> {
        let myURLs: [URL] = ...

        return Future<Bool, Error> { promise in
            let numberOfURLs = myURLS.count
            var numberOfFinishedTasks = 0

            for url in myURLs {
                let cancellable = testRawDataTaskPublisher(for: url)
                    .sink(receiveCompletion: { _ in }) { data in
                        // save the data...
                        numberOfFinishedTasks += 1

                        if numberOfFinishedTasks >= numberOfURLs {
                            promise(.success(true))
                        }
                    }

                cancellables.insert(cancellable)
            }
        }.eraseToAnyPublisher()
    }

    func testRawDataTaskPublisher(for url: URL) -> AnyPublisher<Data, Error> {
        ...
    }
}

Обычно я бы использовал DispatchGroup, запускал несколько задач HTTP и получал уведомление, когда задачи завершены, но мне интересно, как написать это современным способом, используя Combine.


person kampro    schedule 10.12.2019    source источник


Ответы (1)


Вы можете запустить некоторые операции параллельно, создав коллекцию издателей, применив оператор flatMap, а затем collect, чтобы дождаться завершения всех издателей, прежде чем продолжить. Вот пример, который можно запустить на детской площадке:

import Combine
import Foundation

func delayedPublisher<Value>(_ value: Value, delay after: Double) -> AnyPublisher<Value, Never> {
  let p = PassthroughSubject<Value, Never>()
  DispatchQueue.main.asyncAfter(deadline: .now() + after) {
    p.send(value)
    p.send(completion: .finished)
  }
  return p.eraseToAnyPublisher()
}

let myPublishers = [1,2,3]
  .map{ delayedPublisher($0, delay: 1 / Double($0)).print("\($0)").eraseToAnyPublisher() }

let cancel = myPublishers
  .publisher
  .flatMap { $0 }
  .collect()
  .sink { result in
    print("result:", result)
  }

Вот результат:

1: receive subscription: (PassthroughSubject)
1: request unlimited
2: receive subscription: (PassthroughSubject)
2: request unlimited
3: receive subscription: (PassthroughSubject)
3: request unlimited
3: receive value: (3)
3: receive finished
2: receive value: (2)
2: receive finished
1: receive value: (1)
1: receive finished
result: [3, 2, 1]

Обратите внимание, что все издатели сразу запускаются (в исходном порядке).

Из-за задержки 1 / $0 первому издателю требуется больше всего времени для завершения. Обратите внимание на порядок значений в конце. Поскольку выполнение первого задания заняло больше всего времени, это последний элемент.

person Gil Birman    schedule 10.12.2019
comment
Отличное решение imo. Но мне просто интересно, намеренно ли вы изменили тип значения издателей с Int на Double? - person nayem; 11.12.2019
comment
Свифт здесь странный. Поскольку $0 используется для вычисления значения Double (задержки), числа в исходном массиве выводятся как Doubles. Мне, вероятно, следовало избегать дженериков, чтобы было проще. - person Gil Birman; 11.12.2019
comment
Ну, не совсем то, что я думаю, Свифт здесь странный. Не слишком любопытный по этому поводу, но я думаю, что смогу пролить свет на эту причину, если вы позволите мне. Причина здесь - вывод типа $ нотации. Вы определили функцию как универсальную и не помогаете выводу типа определять типы обоих аргументов при вызове. Оба параметра передаются с обозначением $, а параметр delay заставляет тип $ быть Double. Следовательно, общий Value также считается Double. Вы можете избавиться от этого с помощью Double(1 / $0) или 1 / Double($0). - person nayem; 11.12.2019
comment
Я согласен с обоими нашими объяснениями ????Спасибо за предложенное исправление. Обновлен ответ, чтобы включить его. - person Gil Birman; 11.12.2019
comment
Все чисто. Спасибо, проблема решена. - person kampro; 11.12.2019
comment
Обратите внимание, что .publisher может использовать Never ошибку и испортить ваши типы. В качестве альтернативы вы можете использовать Publishers.Sequence(sequence: array.map(...)), чтобы типы ошибок оставались неизменными. - person alejandromp; 08.05.2020
comment
Что, если я хочу сделать то же самое с издателями, которые возвращаются с разными типами? - person Cyber Gh; 26.07.2020