Производитель-потребитель, использующий потоки

Я пишу программу, реализующую проблему производителя-потребителя на Java с использованием концепции многопоточности. Ниже несколько деталей, как я должен это сделать:

1) Основной поток должен создать буфер с емкостью, указанной в качестве аргумента командной строки. Количество потоков производителя и потребителя также указывается в качестве аргументов командной строки. Я должен присвоить уникальный номер каждому потоку-производителю и потребителю. Как присвоить уникальный номер потокам производителя и потребителя?

2) Поток-производитель работает в бесконечном цикле. Он создает элемент данных (строку) в следующем формате: <producer number>_<data item number>. Например, 1-й элемент данных из потока № 1 будет 1_1, а второй элемент данных из потока № 3 будет 3_2. Как создавать элементы данных в таком формате?

3) Затем поток производителя записывает запись в файл журнала производителя (‹ номер производителя > «Сгенерировано» <data item>). После записи записи журнала она пытается вставить ее в буфер. Если вставка прошла успешно, в лог-файле создается запись (<producer number> <data item> «Вставка прошла успешно»). Как написать такой код?

Ниже приведен код Java, который я написал.

import java.util.*;
import java.util.logging.*;

public class PC2
{
    public static void main(String args[])
    {
            ArrayList<Integer> queue = new ArrayList<Integer>();

            int size = Integer.parseInt(args[2]);
            Thread[] prod = new Thread[Integer.parseInt(args[0])];
            Thread[] cons = new Thread[Integer.parseInt(args[1])];

            for(int i=0; i<prod.length; i++)
            {
                    prod[i] = new Thread(new Producer(queue, size));
                    prod[i].start();
            }

            for(int i=0; i<cons.length; i++)
            {
                    cons[i] = new Thread(new Consumer(queue, size));
                    cons[i].start();
                }

    }
}

class Producer extends Thread
{
    private final ArrayList<Integer> queue;
    private final int size;

    public Producer(ArrayList<Integer> queue, int size)
    {
            this.queue = queue;
            this.size = size;
    }

    public void run()
    {
            while(true){
            for(int i=0; i<size; i++)
            {
                    System.out.println("Produced: "+i+" by id " +Thread.currentThread().getId());
try
                    {
                            produce(i);
                            Thread.sleep(3000);
                    }
                    catch(Exception e)
                    {
                            Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, e);
                    }
            }}
    }


    public void produce(int i) throws InterruptedException
    {
            while(queue.size() == size)
            {
                    synchronized(queue)
                    {
                            System.out.println("Queue is full "+Thread.currentThread().getName() +" is waiting, size: "+queue.size());
                            queue.wait();
                       }
            }
            synchronized(queue)
            {
                    queue.add(i);
                    queue.notifyAll();
            }
    }
}

class Consumer extends Thread
{
    private final ArrayList<Integer> queue;
    private final int size;

    public Consumer(ArrayList<Integer> queue, int size)
    {
            this.queue = queue;
            this.size = size;
    }

    public void run()
    {
            while(true)
            {
                    try
                    {       System.out.println("Consumed: "+consume());
                            Thread.sleep(1000);
                    }
                    catch(Exception e)
                    {
                            Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, e);
                    }
            }
    }

    public int consume() throws InterruptedException
    {
            while(queue.isEmpty())
            {
                    synchronized(queue)
                    {
                            System.out.println("Queue is empty "+Thread.currentThread().getName()+" is waiting, size: "+queue.size());
                            queue.wait();
                        }
            }

            synchronized (queue)
            {
                    queue.notifyAll();
                    System.out.println("Consumed by id "+Thread.currentThread().getId());
                    return (Integer) queue.remove(0);

            }
    }
}

Как я могу выполнить вышеуказанные шаги?


person user2201650    schedule 31.10.2013    source источник


Ответы (5)


Я должен присвоить уникальный номер каждому потоку-производителю и потребителю. Как присвоить уникальный номер потокам производителя и потребителя?

Добавьте переменную экземпляра (нестатическую) в классы Producer/Consumer. Когда вы инициализируете новые объекты-производители/потребители, передайте уникальный номер. Вы можете следить за своим номером с помощью int counter в своем основном классе.

2) Поток-производитель работает в бесконечном цикле. Он создает элемент данных (строку) в следующем формате: ‹ номер производителя >_‹ номер элемента данных > . Например, 1-й элемент данных из потока № 1 будет 1_1, а второй элемент данных из потока № 3 будет 3_2. Как создавать элементы данных в таком формате?

Используйте синхронизированные методы и/или атомарные переменные. Посмотрите на Java Concurrency.

3) Затем поток производителя записывает запись в файл журнала производителя (‹ номер производителя > «Сгенерировано» ‹ элемент данных >). После записи записи журнала она пытается вставить ее в буфер. Если вставка прошла успешно, в файле журнала создается запись (‹ номер производителя > ‹ элемент данных > «Вставка успешна»). Как написать такой код?

Мой ответ такой же, как и на предыдущий вопрос: прочтите о параллелизме в Java. Потратьте час на чтение о синхронизации, блокировках и атомарных переменных, и я гарантирую, что вы легко напишете свою программу.

person KyleM    schedule 31.10.2013
comment
Я не должен использовать какие-либо потокобезопасные структуры данных для этой программы. - person user2201650; 31.10.2013
comment
@user2201650 user2201650 Посмотрите на stackoverflow.com/questions/1006655/ . Подумайте, создаете ли вы индекс int и используете его для доступа к своей очереди; например очередь.получить(индекс++). Как вы думаете, что произойдет? - person KyleM; 31.10.2013
comment
Я имею в виду ответ Джона Скита. Он ясно говорит, что операции с примитивными типами (такими как int) в Java атомарны. Поэтому гарантируется, что если один поток выполнит queue.get(index++), новое значение индекса будет немедленно видно всем другим потокам. Поэтому два потока никогда не получат один и тот же объект из очереди. - person KyleM; 31.10.2013
comment
Я новичок в программировании на Java. Так что не могли бы вы помочь мне здесь? - person user2201650; 31.10.2013
comment
Это не сайт для домашних заданий. Помощь, которую я дал вам до сих пор, достаточно легко решить вашу проблему. Извини :( - person KyleM; 31.10.2013
comment
Я новичок в программировании на Java, поэтому я не могу понять ту ссылку, которую вы мне дали :( Хотел бы я получить здесь помощь :( - person user2201650; 31.10.2013

Для проблемы производителя-потребителя лучшим решением является BlockingQueue. Я тестировал несколько вещей, поэтому разработал такую ​​же программу, теперь изменил ее в соответствии с вашими потребностями.

Посмотрите, поможет ли это.

import java.util.concurrent.*;
public class ThreadingExample {

    public static void main(String args[]){
        BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<Message>(100);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Producer(blockingQueue));
        exec.execute(new Consumer(blockingQueue));
    }

}
class Message{
    private static int count=0;
    int messageId;
    Message(){
        this.messageId=count++;
        System.out.print("message Id"+messageId+" Created ");
    }
}
class Producer implements Runnable{

    private BlockingQueue<Message> blockingQueue;
    Producer(BlockingQueue<Message> blockingQueue){
        this.blockingQueue=blockingQueue;
    }

    @Override
    public void run(){
        while(!Thread.interrupted()){
            System.out.print("Producer Started");
            try {
                blockingQueue.put(new Message());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Producer Done");
        }
    }
}

class Consumer implements Runnable{
    private BlockingQueue<Message> blockingQueue;
    Consumer(BlockingQueue<Message> blockingQueue){
        this.blockingQueue=blockingQueue;
    }

    @Override
    public void run(){
        while(!Thread.interrupted()){
            System.out.print("Concumer Started");
            try{
                Message message  = blockingQueue.take();
                System.out.print("message Id"+message.messageId+" Consumed ");
            }
            catch(InterruptedException e){
                e.printStackTrace();
            }
            System.out.println("Concumer Done");
        }
    }
}
person Vipin    schedule 31.10.2013
comment
Является ли BlockingQueue потокобезопасным? Потому что я не должен использовать какие-либо потокобезопасные структуры данных. - person user2201650; 02.11.2013
comment
Может ли кто-нибудь помочь мне здесь, пожалуйста? (Извините, если напрягаю) - person user2201650; 06.11.2013
comment
@ user2201650 просмотрите этот вопрос для своего ответа stackoverflow.com/questions/2695426/ - person Vipin; 11.11.2013

Я попробовал следующее, что может сработать для вас, за исключением условия буфера на 3, которое вы можете добавить самостоятельно. Надеюсь это поможет.

public class Message {

    private String msg;

    public Message(String msg) {
        super();
        this.msg = msg;
    }

    public String getMsg(){
        return msg;
    }
}




import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue<Message> queue;
    private boolean run = true;

    public Producer(BlockingQueue<Message> queue) {
        super();
        this.queue = queue;
    }

    public void setRun(boolean val) {
        this.run = val;
    }

    @Override
    public void run() {
        int i = 0;
        while (run) {
            Message msg = new Message(Thread.currentThread().getName() + "_"+ i);
            try {
                Thread.sleep(i * 100);
                queue.put(msg);
                System.out.println("Producer: "+Thread.currentThread().getName()+" produced and added to the queue: "+msg.getMsg());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            i++;
            if(i==10){
                setRun(false);
                System.out.println(Thread.currentThread().getName()+" stopped");
            }
        }

    }
}



import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{

    private BlockingQueue<Message> queue;
    private boolean run = true;

    public Consumer(BlockingQueue<Message> queue) {
        super();
        this.queue = queue;
    }

    public void setRun(boolean val){
        this.run = val;
    }

    @Override
    public void run() {
        while(run){
            try {
                Thread.sleep(100);
                Message msg = queue.take();
                System.out.println("Consumer: "+Thread.currentThread().getName()+"         generated/consumed "+msg.getMsg());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }
  }




import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerMain {

    public static void main(String[] args) {
        System.out
                .println("please enter the number of producer:consumer:size of the queue in order");

        Scanner scan = new Scanner(System.in);

        Thread[] prodThreads = new Thread[scan.nextInt()];
        Thread[] consThreads = new Thread[scan.nextInt()];
        BlockingQueue<Message> queue = new ArrayBlockingQueue<Message>(scan.nextInt());

        for (int i = 0; i < prodThreads.length; i++) {
            prodThreads[i] = new Thread(new Producer(queue), "" + i);
            prodThreads[i].start();
        }

        for (int i = 0; i < consThreads.length; i++) {
            consThreads[i] = new Thread(new Consumer(queue), "" + i);
            consThreads[i].start();
        }


    }

}
person Sharath    schedule 14.09.2014

Пожалуйста, обратитесь к приведенному ниже коду. Вы можете изменить постоянные значения на основе аргументов командной строки. Я проверил код, он работает в соответствии с вашим требованием.

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumerProblem {
    public static int CAPACITY = 10; // At a time maximum of 10 tasks can be
                                        // produced.
    public static int PRODUCERS = 2;
    public static int CONSUMERS = 4;

    public static void main(String args[]) {
        Queue<String> mTasks = new LinkedList<String>();
        for (int i = 1; i <= PRODUCERS; i++) {
            Thread producer = new Thread(new Producer(mTasks));
            producer.setName("Producer " + i);
            producer.start();
        }
        for (int i = 1; i <= CONSUMERS; i++) {
            Thread consumer = new Thread(new Consumer(mTasks));
            consumer.setName("Consumer " + i);
            consumer.start();
        }

    }

}

class Producer implements Runnable {

    Queue<String> mSharedTasks;
    int taskCount = 1;

    public Producer(Queue<String> mSharedTasks) {
        super();
        this.mSharedTasks = mSharedTasks;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (mSharedTasks) {
                try {
                    if (mSharedTasks.size() == ProducerConsumerProblem.CAPACITY) {
                        System.out.println("Producer Waiting!!");
                        mSharedTasks.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            while (mSharedTasks.size() != ProducerConsumerProblem.CAPACITY) {

                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }

                String produceHere = Thread.currentThread().getName()
                        + "_Item number_" + taskCount++;

                synchronized (mSharedTasks) {
                    mSharedTasks.add(produceHere);
                    System.out.println(produceHere);
                    if (mSharedTasks.size() == 1) {
                        mSharedTasks.notifyAll(); // Informs consumer that there
                                                    // is something to consume.
                    }
                }
            }

        }
    }
}

class Consumer implements Runnable {
    Queue<String> mSharedTasks;

    public Consumer(Queue<String> mSharedTasks) {
        super();
        this.mSharedTasks = mSharedTasks;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (mSharedTasks) {
                if (mSharedTasks.isEmpty()) { // Checks whether there is no task
                                                // to consume.
                    try {
                        mSharedTasks.wait(); // Waits for producer to produce!
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }
            while (!mSharedTasks.isEmpty()) { // Consumes till task list is
                                                // empty
                try {
                    // Consumer consumes late hence producer has to wait...!
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                synchronized (mSharedTasks) {

                    System.out.println(Thread.currentThread().getName()
                            + " consumed " + mSharedTasks.poll());
                    if (mSharedTasks.size() == ProducerConsumerProblem.CAPACITY - 1)
                        mSharedTasks.notifyAll();
                }

            }

        }
    }

}
person Arpit Ratan    schedule 03.07.2015

person    schedule
comment
Я вижу тот же ответ tutorialspoint.com/javaexamples/thread_procon.htm - person ravindrab; 18.04.2016