Rudimentary WebSocket interface

This commit is contained in:
Mikko Ahlroth 2024-07-07 11:40:30 +03:00
parent f8b6e1af9e
commit a52629ceaa
9 changed files with 229 additions and 39 deletions

View file

@ -1,26 +1,17 @@
import chip
import gleam/erlang/process
import gleam/otp/actor
import gleam/otp/supervisor
pub type Id {
Updater
}
pub type Store(message, id) =
chip.Registry(message, id, Nil)
pub type Store(message) =
chip.Registry(message, Id, Nil)
pub fn start() -> Result(Store(message), actor.StartError) {
pub fn start() -> Result(Store(message, id), actor.StartError) {
chip.start()
}
pub fn childspec() {
supervisor.worker(fn(_index) { start() })
}
pub fn register(
store: Store(message),
id: Id,
store: Store(message, id),
id: id,
subject: process.Subject(message),
) {
chip.new(subject)
@ -28,6 +19,6 @@ pub fn register(
|> chip.register(store, _)
}
pub fn get(store: Store(message), id: Id) {
pub fn get(store: Store(message, id), id: id) {
chip.find(store, id)
}

View file

@ -1,7 +1,10 @@
import ap_systems/api
import ap_systems/module_power
import aurinko/registry
import aurinko/updater/messaging.{
type Message, GetData, GetDataResponse, PeriodicUpdate, Update,
}
import aurinko/updater/pubsub
import aurinko/updater/registry
import aurinko/updater/types.{
type Dataset, type ModuleDataset, Dataset, ModuleDataset, ModulePower,
}
@ -11,7 +14,6 @@ import biscotto
import gleam/dict
import gleam/dynamic
import gleam/erlang/process
import gleam/io
import gleam/list
import gleam/option
import gleam/order
@ -24,16 +26,6 @@ const update_interval = 30_000
const login_cookie_expiry = 86_400
pub type GetDataResponse {
GetDataResponse(dataset: option.Option(Dataset))
}
pub type Message {
Update
PeriodicUpdate
GetData(reply_subject: process.Subject(GetDataResponse))
}
pub type ModuleIDs =
api.ModuleIDs
@ -81,7 +73,7 @@ pub fn start(
username: String,
password: String,
pubsub: pubsub.UpdaterPubSub,
updater_registry: registry.Store(Message),
updater_registry: registry.UpdaterRegistry,
) {
let spec =
actor.Spec(
@ -91,7 +83,7 @@ pub fn start(
process.new_selector() |> process.selecting(self_subject, fn(a) { a })
registry.register(updater_registry, registry.Updater, self_subject)
process.send_after(self_subject, 0, PeriodicUpdate)
process.send_after(self_subject, 0, Update)
process.send_after(self_subject, update_interval, PeriodicUpdate)
actor.Ready(
@ -147,8 +139,6 @@ fn handle_message(message: Message, state: State) -> actor.Next(Message, State)
}
})
io.debug(#("Next state", next))
case next {
Ok(next) -> next
Error(err) -> {

View file

@ -0,0 +1,13 @@
import aurinko/updater/types
import gleam/erlang/process
import gleam/option
pub type GetDataResponse {
GetDataResponse(dataset: option.Option(types.Dataset))
}
pub type Message {
Update
PeriodicUpdate
GetData(reply_subject: process.Subject(GetDataResponse))
}

View file

@ -13,3 +13,5 @@ pub type UpdaterPubSub =
pub fn init() -> actor.StartResult(glubsub.Message(OutMessage)) {
generic_pubsub.init()
}
pub const from_subject = generic_pubsub.from_subject

View file

@ -0,0 +1,20 @@
import aurinko/registry
import aurinko/updater/messaging
import gleam/otp/actor
pub type Id {
Updater
}
pub type UpdaterRegistry =
registry.Store(messaging.Message, Id)
pub fn start() -> Result(UpdaterRegistry, actor.StartError) {
registry.start()
}
pub const register = registry.register
pub fn get(updater_registry: UpdaterRegistry) {
registry.get(updater_registry, Updater)
}

View file

@ -1,8 +1,9 @@
import aurinko/registry
import aurinko/updater
import aurinko/updater/pubsub
import aurinko/updater/registry
import aurinko/web/router
import aurinko/web/ws
import gleam/dynamic
import gleam/http/request
import gleam/otp/actor
import gleam/result
import mist
@ -11,12 +12,25 @@ import wisp
pub fn init(
port: Int,
secret_key_base: String,
_updater_pubsub: pubsub.UpdaterPubSub,
_updater_registry: registry.Store(updater.Message),
updater_pubsub: pubsub.UpdaterPubSub,
updater_registry: registry.UpdaterRegistry,
) {
wisp.configure_logger()
wisp.mist_handler(router.handle_request, secret_key_base)
let wisp_side = wisp.mist_handler(router.handle_request, secret_key_base)
fn(req) {
case request.path_segments(req) {
["ws"] ->
mist.websocket(
request: req,
on_init: ws.on_init(_, updater_pubsub, updater_registry),
on_close: ws.on_close,
handler: ws.handle_message,
)
_ -> wisp_side(req)
}
}
|> mist.new()
|> mist.port(port)
|> mist.start_http()

View file

@ -0,0 +1,89 @@
import aurinko/updater
import aurinko/updater/messaging
import aurinko/updater/pubsub
import aurinko/updater/registry
import aurinko/web/ws/encoder
import gleam/erlang/process
import gleam/json
import gleam/option
import gleam/otp/actor
import glubsub
import mist
pub type State {
State(
updater_pubsub: pubsub.UpdaterPubSub,
updater_registry: registry.UpdaterRegistry,
pubsub_subject: process.Subject(pubsub.OutMessage),
)
}
pub fn on_init(
conn: mist.WebsocketConnection,
updater_pubsub: pubsub.UpdaterPubSub,
updater_registry: registry.UpdaterRegistry,
) {
let pubsub_subject = process.new_subject()
let assert Ok(_) = glubsub.subscribe(updater_pubsub, pubsub_subject)
send_initial_dataset(conn, updater_registry)
let self_selector =
process.new_selector() |> process.selecting(pubsub_subject, fn(m) { m })
let state = State(updater_pubsub, updater_registry, pubsub_subject)
#(state, option.Some(self_selector))
}
pub fn on_close(state: State) {
let _ = glubsub.unsubscribe(state.updater_pubsub, state.pubsub_subject)
Nil
}
pub fn handle_message(
state: State,
conn: mist.WebsocketConnection,
message: mist.WebsocketMessage(pubsub.OutMessage),
) {
case message {
mist.Custom(pubsub.NewData(dataset)) -> {
let dataset = encoder.dataset(dataset)
let output =
json.object([
#("type", json.string("data_update")),
#("dataset", dataset),
])
|> json.to_string()
let assert Ok(_) = mist.send_text_frame(conn, output)
actor.continue(state)
}
mist.Closed | mist.Shutdown -> actor.Stop(process.Normal)
_ -> actor.continue(state)
}
}
fn send_initial_dataset(
conn: mist.WebsocketConnection,
updater_registry: registry.UpdaterRegistry,
) {
let updater_subject = registry.get(updater_registry)
case updater_subject {
Ok(sub) -> {
let messaging.GetDataResponse(initial_data) = updater.get_data(sub)
let dataset = case initial_data {
option.Some(dataset) -> encoder.dataset(dataset)
option.None -> json.null()
}
let output =
json.object([
#("type", json.string("initial_data")),
#("dataset", dataset),
])
|> json.to_string()
let assert Ok(_) = mist.send_text_frame(conn, output)
Nil
}
Error(_) -> Nil
}
}

View file

@ -0,0 +1,72 @@
import ap_systems/module_power
import aurinko/updater/types
import birl
import gleam/dict
import gleam/json
import gleam/list
import gleamy/red_black_tree_set
pub fn production_info(info: types.ProductionInfo) {
json.object([
#("co2", json.float(info.co2)),
#("duration", json.int(info.duration)),
#("last_power", json.float(info.last_power)),
#("lifetime", json.float(info.lifetime)),
#("today", json.float(info.today)),
])
}
pub fn momentary_power(power: types.MomentaryPower) {
case power {
module_power.NotConnected -> json.string("NC")
module_power.Watts(int) -> json.int(int)
}
}
pub fn module_power(power: types.ModulePower) {
json.object([
#("id", json.string(power.id)),
#(
"power",
json.object(
power.power
|> dict.to_list()
|> list.map(fn(item) {
let #(k, v) = item
#(birl.to_iso8601(k), momentary_power(v))
}),
),
),
])
}
pub fn module_dataset(dataset: types.ModuleDataset) {
json.object([
#(
"times",
dataset.times
|> red_black_tree_set.foldr([], fn(acc, time) {
[birl.to_iso8601(time), ..acc]
})
|> json.array(json.string),
),
#(
"datas",
json.object(
dataset.datas
|> dict.to_list()
|> list.map(fn(item) {
let #(k, v) = item
#(k, module_power(v))
}),
),
),
])
}
pub fn dataset(dataset: types.Dataset) {
json.object([
#("production_info", production_info(dataset.production_info)),
#("modules", module_dataset(dataset.modules)),
])
}

View file

@ -1,7 +1,6 @@
import aurinko/pubsub as generic_pubsub
import aurinko/registry
import aurinko/updater
import aurinko/updater/pubsub
import aurinko/updater/registry
import aurinko/web
import dot_env
import dot_env/env
@ -26,7 +25,7 @@ pub fn main() {
|> supervisor.add(
supervisor.worker(fn(_) { pubsub.init() })
|> supervisor.returning(fn(_, updater_pubsub_subject) {
generic_pubsub.from_subject(updater_pubsub_subject)
pubsub.from_subject(updater_pubsub_subject)
}),
)
|> supervisor.add(