сервер julia с асинхронными частями и общей памятью

Я хочу создать «сервер» julia, который содержит кэшированное значение и обслуживает его по запросу и обновляет значение, когда поступает обновление из других каналов.

Мой план сделать это таков: использовать julia ZMQ (zeromq), который прослушивает сокет REP (ответ) и доставляет значение для любого запроса, поступающего на этот сокет REP. Кроме того, в программе есть сокет SUB (подписка), который обновляет значение всякий раз, когда сокет получает что-либо.

Сокет REP блокируется с помощью ZMQ.recv. Возможно, разъем SUB тоже подходит, не уверен

Но в основном обе части должны работать независимо в цикле while, разделяя некоторую память (переменную).

Так что, возможно, это нужно сделать с помощью SharedArrays, порождая процессы

Но я просто не могу понять, как это сделать в коде. Например. Я могу @spawn каждый такой процесс, у одного есть REP, у другого есть сокет SUB, но я не знаю, как заставить их pid создать SharedArray

Может кто-нибудь помочь, пожалуйста?

Я также открыт для различных дизайнерских решений для решения проблемы (в основном данные постоянно обновляются из какого-то источника, и другие программы должны иметь возможность получать самую последнюю копию этих данных).

Спасибо, Имран.

EDIT: я получил простую версию, которая работает следующим образом: у нее есть 2 независимых сокета REP/REQ. Странно то, что иногда это работает, а иногда после нескольких вызовов readcache() и writecache (41) блоков либо в кэше чтения, либо в кэше записи ... но я не могу воспроизвести, так как иногда он просто работает плавно

Это правильный способ решить эту проблему в Джулии?

using ZMQ

type CT
 a::Int
 b::String
end
ct = CT(1,"a")

readport = 5551
readproc = @spawn readcacheproc(ct,readport)

writeport = 5552
writeproc = @spawn writecacheproc(ct,writeport)

# test as follows
# readcache() # expect [1 a]
# writecache("test") # expect [4 test]
# readcache() # expect [4 test]

function readcache(port=readport)
 ctx=Context()
 s=Socket(ctx,REQ)
 ZMQ.connect(s,"tcp://localhost:$port")

 ZMQ.send(s,"")
 println(bytestring(ZMQ.recv(s)))

 ZMQ.close(s)
 ZMQ.close(ctx)
end

function writecache(value,port=writeport)
 ctx=Context()
 s=Socket(ctx,REQ)
 ZMQ.connect(s,"tcp://localhost:$port")

 ZMQ.send(s,"$value")
 println(bytestring(ZMQ.recv(s)))

 ZMQ.close(s)
 ZMQ.close(ctx)
end

function readcacheproc(cache,port=readport)

 ctx=Context()
 s=Socket(ctx,REP)
 ZMQ.bind(s,"tcp://*:$port")

 done = false
 while !done
  msg = bytestring(ZMQ.recv(s)) # actual msg is ignored
  ZMQ.send(s,"$(cache.a) $(cache.b)")
 end

 ZMQ.close(s)
 ZMQ.close(ctx)
end

function writecacheproc(cache,port=writeport)

 ctx=Context()
 s=Socket(ctx,REP)
 ZMQ.bind(s,"tcp://*:$port")

 done = false
 while !done
  msg = bytestring(ZMQ.recv(s))
  cache.a = length(msg)
  cache.b = msg
  ZMQ.send(s,"new cache: $(cache.a) $(cache.b)")
 end

 ZMQ.close(s)
 ZMQ.close(ctx)
end

person imran    schedule 07.03.2016    source источник


Ответы (1)


Кажется, работает следующее, хотя я не знаю, лучший ли это способ решить эту проблему.

using ZMQ

type CT
 a::Int
 b::String
end
ct = CT(1,"a")

readport = 5551
readproc = @spawn readcacheproc(ct,readport)

writeport = 5552
writeproc = @spawn writecacheproc(ct,writeport)

# test as follows
# readcache() # expect [1 a]
# writecache("test") # expect [4 test]
# readcache() # expect [4 test]

function readcache(port=readport)
 ctx=Context()
 s=Socket(ctx,REQ)
 ZMQ.connect(s,"tcp://localhost:$port")

 ZMQ.send(s,"")
 println(bytestring(ZMQ.recv(s)))

 ZMQ.close(s)
 ZMQ.close(ctx)
end

function writecache(value,port=writeport)
 ctx=Context()
 s=Socket(ctx,REQ)
 ZMQ.connect(s,"tcp://localhost:$port")

 ZMQ.send(s,"$value")
 println(bytestring(ZMQ.recv(s)))

 ZMQ.close(s)
 ZMQ.close(ctx)
end

function readcacheproc(cache,port=readport)

 ctx=Context()
 s=Socket(ctx,REP)
 ZMQ.bind(s,"tcp://*:$port")

 done = false
 while !done
  msg = bytestring(ZMQ.recv(s)) # actual msg is ignored
  ZMQ.send(s,"$(cache.a) $(cache.b)")
 end

 ZMQ.close(s)
 ZMQ.close(ctx)
end

function writecacheproc(cache,port=writeport)

 ctx=Context()
 s=Socket(ctx,REP)
 ZMQ.bind(s,"tcp://*:$port")

 done = false
 while !done
  msg = bytestring(ZMQ.recv(s))
  cache.a = length(msg)
  cache.b = msg
  ZMQ.send(s,"new cache: $(cache.a) $(cache.b)")
 end

 ZMQ.close(s)
 ZMQ.close(ctx)
end
person imran    schedule 07.03.2016