Многопоточность Java, выполняющая больше, чем границы цикла

Я создаю веб-скребок для извлечения ссылок и электронных писем из Интернета. Ссылки будут использоваться для поиска новых мест для поиска электронных писем, а затем электронные письма будут храниться в наборе. Каждая ссылка передается в фиксированный пул потоков в своем собственном потоке для поиска дополнительных электронных писем. Я начал с малого и искал только 10 писем, но по какой-то причине мой код возвращает около 13 писем.

    while (emailSet.size() <= EMAIL_MAX_COUNT) {

        link = linksToVisit.poll();

        linksToVisit.remove(link);
        linksVisited.add(link);
        pool.execute(new Scraper(link));
    }

    pool.shutdownNow();

    emailSet.stream().forEach((s) -> {
        System.out.println(s);
    });
    System.out.println(emailSet.size());

Хотя я понимаю, что можно создать дополнительные темы, которые будут работать после того, как я получу 10 электронных писем, не следует ли pool.shutdownNow() завершать эти темы?

Вот мой код темы, если это поможет.

class Scraper implements Runnable {

    private String link;

    Scraper(String s) {
        link = s;
    }

    @Override
    public void run() {
        try {
            Document doc = (Document) Jsoup.connect(link).get();

            Elements links = doc.select("a[href]");
            for (Element href : links) {
                String newLink = href.attr("abs:href");
                if (!linksVisited.contains(newLink) && !linksToVisit.contains(newLink)) {
                    linksToVisit.add(newLink);
                }
            }
            Pattern p = Pattern.compile(
                    "[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+");
            Matcher matcher = p.matcher(doc.text());
            while (matcher.find()) {
                emailSet.add(matcher.group());
            }
        } catch (Exception e) {
            //Catch on of the many exceptions Jsoup.connect might throw 
            // and just let the thread expire.
        }
    }
}

Редактировать 1:

Я должен включить это в свой первый раз, но я использую потокобезопасный набор и очередь.

Set<String> emailSet = Collections.synchronizedSet(new HashSet());
BlockingQueue<String> linksToVisit = new ArrayBlockingQueue(10000);
Set<String> linksVisited = Collections.synchronizedSet(new HashSet());
final int EMAIL_MAX_COUNT = 10;
ExecutorService pool = newFixedThreadPool(25);

Редактировать 2

Решил, что я должен обновить свой вопрос с ответом, так что вот где была моя проблема.

    while (emailSet.size() <= EMAIL_MAX_COUNT) {

    link = linksToVisit.poll();

    linksToVisit.remove(link);
    linksVisited.add(link);
    pool.execute(new Scraper(link));
}

Мой список начнется только с одной ссылки. После того, как первая ссылка была удалена, у меня был пустой список, который продолжал создавать новые темы без ссылок для поиска. Прежде чем список мог быть заполнен, я уже создал сотни потоков, которые ничего не делали, но замедляли работу системы, пока она, наконец, не рухнула.

Вот исправление кода, чтобы гарантировать, что потоки не будут созданы, если не будет ссылки для поиска.

while (emailSet.size() <= EMAIL_MAX_COUNT) {

        if (linksToVisit.size() > 0) {
            link = linksToVisit.poll();

            linksToVisit.remove(link);
            linksVisited.add(link);
            pool.execute(new Scraper(link));
            //System.out.println("Emails " + emailSet.size());
        } else {
            try {
                Thread.sleep(100);
            } catch (InterruptedException ex) {
                Logger.getLogger(Crawler.class.getName())
                        .log(Level.SEVERE, null, ex);
            }
        }
    }

person mcb    schedule 29.05.2015    source источник
comment
Каково значение EMAIL_MAX_COUNT?   -  person Lyrion    schedule 29.05.2015
comment
Почему вы копаетесь в Интернете в поисках адресов электронной почты? Я вижу здесь потенциальное зло!   -  person K Erlandsson    schedule 29.05.2015
comment
EMAIL_MAX_COUNT: 10.   -  person mcb    schedule 29.05.2015
comment
Если вы не синхронизируете что-либо из этого, у вас есть потоки, записывающие в emailSet, в то время как другие читают его размер. Давайте рассмотрим простой пример с двумя потоками T1 и T2. T1 входит в ваш критический раздел (цикл while) и считывает размер. Он продолжает работать. T2 входит в цикл while и считывает размер до добавления T1. Затем T2 продолжает работу, а T1 добавляет к набору, который теперь содержит 10 элементов. Однако T2 уже находится в критической секции и также добавляет элемент, в результате чего получается 11 элементов. Когда это происходит с более чем 2 потоками, бинго, произвольное число выше 10.   -  person SBI    schedule 29.05.2015
comment
да в принципе не поможет ответить на это ;)   -  person Joeblade    schedule 29.05.2015
comment
Отредактировано, чтобы показать, что я синхронизирую код. Также это HW, не пытающийся быть злым. Мой учитель в основном хочет, чтобы мы очистили все электронные письма, связанные с моей школой.   -  person mcb    schedule 29.05.2015
comment
Использование потокобезопасных коллекций само по себе не защищает автоматически от проблем параллелизма. Сами коллекции лишь гарантируют безопасность любого доступа к их данным. Однако ваша логика должна отражать ваши требования, и ваша задача — убедиться, что коллекции используются в правильном контексте.   -  person SBI    schedule 29.05.2015


Ответы (1)


Вы запускаете скрейпер асинхронно внутри цикла, который проверяет размер emailSet, но в течение одного цикла цикла скрапер может найти более одного письма, или вы можете запустить более одного скрейпера и после его запуска он добавляет alla электронная почта ссылается на страницу Учитывайте следующее время

T1 loop start ->T2 loop schedule Scaper ->T3 loop check emailSet ->T4 Scraper finds 13 email -> T5  loop check emailSet

или следующий

T1 loop start ->T2 loop schedule Scaper "1" ->T3 loop check emailSet ->T4 loop schedule Scaper "2" T5 -> Scraper "1" finds 6 emails -> T6  loop check emailSet -> Scraper "1" finds 7 emails

и так далее.

Если вы хотите остановиться, когда найдете 10 электронных писем, вам нужно изменить следующее.

while (matcher.find()) {
    emailSet.add(matcher.group());
}

to

while (matcher.find()) {
    if (emailSet.size() <= EMAIL_MAX_COUNT) {
        emailSet.add(matcher.group());
    }
}

и даже это не полностью гарантирует, что вы можете остановиться на EMAIL_MAX_COUNT, потому что с несколькими потоками (например, 3) можно проверить размер и получить 9, а затем все они вставят электронное письмо.

Вы должны синхронизировать операции чтения и записи в одном блоке (с помощью synchronized(emailSet) или с помощью блокировки), если хотите обеспечить точный размер emailSet; что-то вроде

while (matcher.find()) {
    synchronized(emailSet) {
       if (emailSet.size() <= EMAIL_MAX_COUNT) {
           emailSet.add(matcher.group());
       }
    }
}
person Giovanni    schedule 29.05.2015