Я хочу создать «сервер» julia, который содержит кэшированное значение и обслуживает его по запросу и обновляет значение, когда поступает обновление из других каналов.
Мой план сделать это таков: использовать julia ZMQ (zeromq), который прослушивает сокет REP (ответ) и доставляет значение для любого запроса, поступающего на этот сокет REP. Кроме того, в программе есть сокет SUB (подписка), который обновляет значение всякий раз, когда сокет получает что-либо.
Сокет REP блокируется с помощью ZMQ.recv. Возможно, разъем SUB тоже подходит, не уверен
Но в основном обе части должны работать независимо в цикле while, разделяя некоторую память (переменную).
Так что, возможно, это нужно сделать с помощью SharedArrays, порождая процессы
Но я просто не могу понять, как это сделать в коде. Например. Я могу @spawn каждый такой процесс, у одного есть REP, у другого есть сокет SUB, но я не знаю, как заставить их pid создать SharedArray
Может кто-нибудь помочь, пожалуйста?
Я также открыт для различных дизайнерских решений для решения проблемы (в основном данные постоянно обновляются из какого-то источника, и другие программы должны иметь возможность получать самую последнюю копию этих данных).
Спасибо, Имран.
EDIT: я получил простую версию, которая работает следующим образом: у нее есть 2 независимых сокета REP/REQ. Странно то, что иногда это работает, а иногда после нескольких вызовов readcache() и writecache (41) блоков либо в кэше чтения, либо в кэше записи ... но я не могу воспроизвести, так как иногда он просто работает плавно
Это правильный способ решить эту проблему в Джулии?
using ZMQ
type CT
a::Int
b::String
end
ct = CT(1,"a")
readport = 5551
readproc = @spawn readcacheproc(ct,readport)
writeport = 5552
writeproc = @spawn writecacheproc(ct,writeport)
# test as follows
# readcache() # expect [1 a]
# writecache("test") # expect [4 test]
# readcache() # expect [4 test]
function readcache(port=readport)
ctx=Context()
s=Socket(ctx,REQ)
ZMQ.connect(s,"tcp://localhost:$port")
ZMQ.send(s,"")
println(bytestring(ZMQ.recv(s)))
ZMQ.close(s)
ZMQ.close(ctx)
end
function writecache(value,port=writeport)
ctx=Context()
s=Socket(ctx,REQ)
ZMQ.connect(s,"tcp://localhost:$port")
ZMQ.send(s,"$value")
println(bytestring(ZMQ.recv(s)))
ZMQ.close(s)
ZMQ.close(ctx)
end
function readcacheproc(cache,port=readport)
ctx=Context()
s=Socket(ctx,REP)
ZMQ.bind(s,"tcp://*:$port")
done = false
while !done
msg = bytestring(ZMQ.recv(s)) # actual msg is ignored
ZMQ.send(s,"$(cache.a) $(cache.b)")
end
ZMQ.close(s)
ZMQ.close(ctx)
end
function writecacheproc(cache,port=writeport)
ctx=Context()
s=Socket(ctx,REP)
ZMQ.bind(s,"tcp://*:$port")
done = false
while !done
msg = bytestring(ZMQ.recv(s))
cache.a = length(msg)
cache.b = msg
ZMQ.send(s,"new cache: $(cache.a) $(cache.b)")
end
ZMQ.close(s)
ZMQ.close(ctx)
end