Потокобезопасность с Netty и Bukkit

Я создаю слушателей, когда канал читается и когда канал пишет в Netty. Слушатели обрабатываются в системе событий Bukkit, проблема не в этом, проблема в безопасности потоков. Bukkit Api запускается в основном потоке и должен быть синхронизирован, иначе сервер взорвется. Netty работает в нескольких потоках, поэтому перекрестная связь может быть затруднена с API Bukkit. Я спросил на форумах Bukkit, и лучший ответ, который я получил, заключался в том, чтобы создать AtomicBoolean и иметь цикл while, это решило бы проблему с синхронизацией отмены отправки и записи пакетов, но это не решает проблему изменения пакета что отправляется/пишется. когда событие вызывается в Bukkit, слушатели вызываются из каждого класса в @EventHandler. весь этот код должен быть синхронизирован с основным потоком, я не уверен, как бы я использовал синхронизацию в этом сценарии. извините за плохое форматирование кода, я не могу работать с этой системой очень хорошо.

public class ConnectionInjector extends ChannelDuplexHandler {

    private User user;
    private Channel channel;
    private PacketRecieveEvent recieve;
    private PacketSendEvent send;
    private boolean isInjected = false;
    private boolean isClosed = false;

    public ConnectionInjector(User user) {
        this.init(user.getPlayer());
    }
    public void close() {
        if (!this.isClosed) {
            this.isClosed = true;
            if (this.isInjected) {
                getChannel().eventLoop().submit(new Callable<Object>() {
                    @Override
                    public Object call() throws Exception {

                        getChannel().pipeline().remove(ConnectionInjector.this);
                        return null;

                    }

                });

                this.isInjected = false;
            }
        }
    }

    public boolean isInjected() {
        return this.isInjected;
    }

    public boolean isClosed() {
        return this.isClosed;
    }


    @Override
    public void write(ChannelHandlerContext context, Object packet, ChannelPromise channel) {
        if (this.isClosed()){
            throw new IllegalStateException("Connection closed already");
        }
        PacketData d = new PacketData(packet);
        send = new PacketSendEvent(user, d);//the event
        Bukkit.getScheduler().scheduleSyncDelayedTask(Main.getInstance(), new Runnable() {
            public void run() {
                Bukkit.getPluginManager().callEvent(send); //runnable puts it on the main thread
            }
        });//end of bukkit thread back on netty
        if (!send.isCancelled()){// this possible happens before the above code which is very bad
            try {
                if (send.getPacket().getRawPacket() == null){
                    throw new IllegalStateException("sent packet was null: " + send.getPacket().getRawPacket().getClass().getName());
                }
                super.write(context, send.getPacket().getRawPacket(), channel); //possible happens before send is even created.
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext context, final Object packet) throws Exception {
        PacketData p = new PacketData(packet);
        recieve = new PacketRecieveEvent(user, p);
        Bukkit.getScheduler().scheduleSyncDelayedTask(Main.getInstance(), new Runnable() {
            public void run() {
                Bukkit.getPluginManager().callEvent(recieve); 
            }

        });
        if (recieve.isCancelled()){
            super.channelRead(context, recieve.getPacket().getRawPacket());
        }


    }

    public void injectfakePacket(Object packet) {
        if (this.isClosed()){
            throw new IllegalStateException("Injector is closed");
        }
        this.getChannel().pipeline().context("encoder").fireChannelRead(packet);
    }

    public void init(Player player) {
      this.channel = (Channel) Nms.getNetChannel(player);
        this.channel.pipeline().addBefore("packet_handler", "epickitpvp", this);
        this.isInjected = true;
    }

    public Channel getChannel() {
        if (this.channel == null){
            throw new IllegalStateException("channel is null");
        }
        return this.channel;
    }
} 

person user3736638    schedule 17.07.2014    source источник
comment
Вам не нужно использовать синхронизацию, если в этом нет необходимости. Ваши поля просто должны быть завершены или сделаны изменчивыми, поскольку непримитивные поля nonfloat/nondouble в основном не записываются, кроме как в конструкторе, а те, которые записываются, являются примитивами, которые требуют только от вас сделать volatile. Кстати, это снова я :P   -  person    schedule 21.07.2014


Ответы (2)


Вы можете получить доступ к Scheudler Bukkit с потоками ASync, поэтому вы можете запланировать новую задачу синхронизации внутри потока ASync, потому что поток синхронизации будет вызываться только тогда, когда основной поток будет готов.

Сложное короткое: запланируйте новую задачу синхронизации с помощью

Bukkit.getScheudler().scheudleSyncTask(PLUGIN, new Runnable() {
    public void run() {
        // Bukkit methods here
    }
});

внутри вашего потока ASync, и все в порядке.

person PreFiXAUT    schedule 17.07.2014
comment
Вот что я делал, вы смотрели на мой код. проблема заключается в получении и настройке информации между двумя потоками. как setCancelled(true), getPacket().setField(a, 1) - person user3736638; 18.07.2014
comment
Извините не видел этого. Тогда я как-то не понимаю, где у вас проблема, потому что синхронизация с Bukkit уже решена. - person PreFiXAUT; 18.07.2014
comment
только вызов события был синхронизирован, остальное - нет, если что-то не синхронизировано, другой поток может получить логическое значение еще до того, как оно будет создано. так как потоки работают с разной скоростью и вытягивают/устанавливают информацию в разное время - person user3736638; 18.07.2014

Я не совсем понимаю, чего вы пытаетесь достичь с помощью этого кода, поэтому я не могу дать вам прямой ответ, но я могу прояснить несколько проблем, которые у вас, вероятно, возникают с потоками.

Прежде всего, Java может и будет кэшировать переменные для каждого потока, что означает, что вы получите несколько копий одной и той же переменной с разными значениями в зависимости от того, в каком потоке вы находитесь. Такое кэширование может вызвать много путаницы. для людей, которые не в курсе. Однако вы можете указать Java сохранять переменную в согласованном состоянии между потоками, используя ключевое слово volatile. Дополнительную информацию о volatile можно найти здесь: http://www.javamex.com/tutorials/synchronization_volatile.shtml

Если у вас возникли проблемы с пониманием ключевого слова synchronized в Java, я предлагаю вам сначала немного изучить, что такое семафор, а затем вернуться к вопросу о синхронизации. Хорошее понимание того, что такое семафор и как он работает, облегчит вам понимание того, как работать в многопоточной среде, где потоки должны совместно использовать состояние (однако этой ситуации следует избегать, когда это возможно).

Вы также можете найти API параллельных коллекций Java очень полезным: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html

person Timotei    schedule 18.07.2014