diff --git a/backend/src/aurinko/registry.gleam b/backend/src/aurinko/registry.gleam index 49dead5..a0bf833 100644 --- a/backend/src/aurinko/registry.gleam +++ b/backend/src/aurinko/registry.gleam @@ -1,26 +1,17 @@ import chip import gleam/erlang/process import gleam/otp/actor -import gleam/otp/supervisor -pub type Id { - Updater -} +pub type Store(message, id) = + chip.Registry(message, id, Nil) -pub type Store(message) = - chip.Registry(message, Id, Nil) - -pub fn start() -> Result(Store(message), actor.StartError) { +pub fn start() -> Result(Store(message, id), actor.StartError) { chip.start() } -pub fn childspec() { - supervisor.worker(fn(_index) { start() }) -} - pub fn register( - store: Store(message), - id: Id, + store: Store(message, id), + id: id, subject: process.Subject(message), ) { chip.new(subject) @@ -28,6 +19,6 @@ pub fn register( |> chip.register(store, _) } -pub fn get(store: Store(message), id: Id) { +pub fn get(store: Store(message, id), id: id) { chip.find(store, id) } diff --git a/backend/src/aurinko/updater.gleam b/backend/src/aurinko/updater.gleam index 5ef48fb..1643648 100644 --- a/backend/src/aurinko/updater.gleam +++ b/backend/src/aurinko/updater.gleam @@ -1,7 +1,10 @@ import ap_systems/api import ap_systems/module_power -import aurinko/registry +import aurinko/updater/messaging.{ + type Message, GetData, GetDataResponse, PeriodicUpdate, Update, +} import aurinko/updater/pubsub +import aurinko/updater/registry import aurinko/updater/types.{ type Dataset, type ModuleDataset, Dataset, ModuleDataset, ModulePower, } @@ -11,7 +14,6 @@ import biscotto import gleam/dict import gleam/dynamic import gleam/erlang/process -import gleam/io import gleam/list import gleam/option import gleam/order @@ -24,16 +26,6 @@ const update_interval = 30_000 const login_cookie_expiry = 86_400 -pub type GetDataResponse { - GetDataResponse(dataset: option.Option(Dataset)) -} - -pub type Message { - Update - PeriodicUpdate - GetData(reply_subject: process.Subject(GetDataResponse)) -} - pub type ModuleIDs = api.ModuleIDs @@ -81,7 +73,7 @@ pub fn start( username: String, password: String, pubsub: pubsub.UpdaterPubSub, - updater_registry: registry.Store(Message), + updater_registry: registry.UpdaterRegistry, ) { let spec = actor.Spec( @@ -91,7 +83,7 @@ pub fn start( process.new_selector() |> process.selecting(self_subject, fn(a) { a }) registry.register(updater_registry, registry.Updater, self_subject) - process.send_after(self_subject, 0, PeriodicUpdate) + process.send_after(self_subject, 0, Update) process.send_after(self_subject, update_interval, PeriodicUpdate) actor.Ready( @@ -147,8 +139,6 @@ fn handle_message(message: Message, state: State) -> actor.Next(Message, State) } }) - io.debug(#("Next state", next)) - case next { Ok(next) -> next Error(err) -> { diff --git a/backend/src/aurinko/updater/messaging.gleam b/backend/src/aurinko/updater/messaging.gleam new file mode 100644 index 0000000..baa9bd9 --- /dev/null +++ b/backend/src/aurinko/updater/messaging.gleam @@ -0,0 +1,13 @@ +import aurinko/updater/types +import gleam/erlang/process +import gleam/option + +pub type GetDataResponse { + GetDataResponse(dataset: option.Option(types.Dataset)) +} + +pub type Message { + Update + PeriodicUpdate + GetData(reply_subject: process.Subject(GetDataResponse)) +} diff --git a/backend/src/aurinko/updater/pubsub.gleam b/backend/src/aurinko/updater/pubsub.gleam index 63d6ede..9372b1b 100644 --- a/backend/src/aurinko/updater/pubsub.gleam +++ b/backend/src/aurinko/updater/pubsub.gleam @@ -13,3 +13,5 @@ pub type UpdaterPubSub = pub fn init() -> actor.StartResult(glubsub.Message(OutMessage)) { generic_pubsub.init() } + +pub const from_subject = generic_pubsub.from_subject diff --git a/backend/src/aurinko/updater/registry.gleam b/backend/src/aurinko/updater/registry.gleam new file mode 100644 index 0000000..aa04260 --- /dev/null +++ b/backend/src/aurinko/updater/registry.gleam @@ -0,0 +1,20 @@ +import aurinko/registry +import aurinko/updater/messaging +import gleam/otp/actor + +pub type Id { + Updater +} + +pub type UpdaterRegistry = + registry.Store(messaging.Message, Id) + +pub fn start() -> Result(UpdaterRegistry, actor.StartError) { + registry.start() +} + +pub const register = registry.register + +pub fn get(updater_registry: UpdaterRegistry) { + registry.get(updater_registry, Updater) +} diff --git a/backend/src/aurinko/web.gleam b/backend/src/aurinko/web.gleam index 7233f2f..85ce10d 100644 --- a/backend/src/aurinko/web.gleam +++ b/backend/src/aurinko/web.gleam @@ -1,8 +1,9 @@ -import aurinko/registry -import aurinko/updater import aurinko/updater/pubsub +import aurinko/updater/registry import aurinko/web/router +import aurinko/web/ws import gleam/dynamic +import gleam/http/request import gleam/otp/actor import gleam/result import mist @@ -11,12 +12,25 @@ import wisp pub fn init( port: Int, secret_key_base: String, - _updater_pubsub: pubsub.UpdaterPubSub, - _updater_registry: registry.Store(updater.Message), + updater_pubsub: pubsub.UpdaterPubSub, + updater_registry: registry.UpdaterRegistry, ) { wisp.configure_logger() - wisp.mist_handler(router.handle_request, secret_key_base) + let wisp_side = wisp.mist_handler(router.handle_request, secret_key_base) + + fn(req) { + case request.path_segments(req) { + ["ws"] -> + mist.websocket( + request: req, + on_init: ws.on_init(_, updater_pubsub, updater_registry), + on_close: ws.on_close, + handler: ws.handle_message, + ) + _ -> wisp_side(req) + } + } |> mist.new() |> mist.port(port) |> mist.start_http() diff --git a/backend/src/aurinko/web/ws.gleam b/backend/src/aurinko/web/ws.gleam new file mode 100644 index 0000000..14cfea9 --- /dev/null +++ b/backend/src/aurinko/web/ws.gleam @@ -0,0 +1,89 @@ +import aurinko/updater +import aurinko/updater/messaging +import aurinko/updater/pubsub +import aurinko/updater/registry +import aurinko/web/ws/encoder +import gleam/erlang/process +import gleam/json +import gleam/option +import gleam/otp/actor +import glubsub +import mist + +pub type State { + State( + updater_pubsub: pubsub.UpdaterPubSub, + updater_registry: registry.UpdaterRegistry, + pubsub_subject: process.Subject(pubsub.OutMessage), + ) +} + +pub fn on_init( + conn: mist.WebsocketConnection, + updater_pubsub: pubsub.UpdaterPubSub, + updater_registry: registry.UpdaterRegistry, +) { + let pubsub_subject = process.new_subject() + let assert Ok(_) = glubsub.subscribe(updater_pubsub, pubsub_subject) + + send_initial_dataset(conn, updater_registry) + + let self_selector = + process.new_selector() |> process.selecting(pubsub_subject, fn(m) { m }) + let state = State(updater_pubsub, updater_registry, pubsub_subject) + #(state, option.Some(self_selector)) +} + +pub fn on_close(state: State) { + let _ = glubsub.unsubscribe(state.updater_pubsub, state.pubsub_subject) + Nil +} + +pub fn handle_message( + state: State, + conn: mist.WebsocketConnection, + message: mist.WebsocketMessage(pubsub.OutMessage), +) { + case message { + mist.Custom(pubsub.NewData(dataset)) -> { + let dataset = encoder.dataset(dataset) + let output = + json.object([ + #("type", json.string("data_update")), + #("dataset", dataset), + ]) + |> json.to_string() + let assert Ok(_) = mist.send_text_frame(conn, output) + actor.continue(state) + } + mist.Closed | mist.Shutdown -> actor.Stop(process.Normal) + _ -> actor.continue(state) + } +} + +fn send_initial_dataset( + conn: mist.WebsocketConnection, + updater_registry: registry.UpdaterRegistry, +) { + let updater_subject = registry.get(updater_registry) + case updater_subject { + Ok(sub) -> { + let messaging.GetDataResponse(initial_data) = updater.get_data(sub) + let dataset = case initial_data { + option.Some(dataset) -> encoder.dataset(dataset) + option.None -> json.null() + } + + let output = + json.object([ + #("type", json.string("initial_data")), + #("dataset", dataset), + ]) + |> json.to_string() + + let assert Ok(_) = mist.send_text_frame(conn, output) + Nil + } + Error(_) -> Nil + } +} diff --git a/backend/src/aurinko/web/ws/encoder.gleam b/backend/src/aurinko/web/ws/encoder.gleam new file mode 100644 index 0000000..cdb82af --- /dev/null +++ b/backend/src/aurinko/web/ws/encoder.gleam @@ -0,0 +1,72 @@ +import ap_systems/module_power +import aurinko/updater/types +import birl +import gleam/dict +import gleam/json +import gleam/list +import gleamy/red_black_tree_set + +pub fn production_info(info: types.ProductionInfo) { + json.object([ + #("co2", json.float(info.co2)), + #("duration", json.int(info.duration)), + #("last_power", json.float(info.last_power)), + #("lifetime", json.float(info.lifetime)), + #("today", json.float(info.today)), + ]) +} + +pub fn momentary_power(power: types.MomentaryPower) { + case power { + module_power.NotConnected -> json.string("NC") + module_power.Watts(int) -> json.int(int) + } +} + +pub fn module_power(power: types.ModulePower) { + json.object([ + #("id", json.string(power.id)), + #( + "power", + json.object( + power.power + |> dict.to_list() + |> list.map(fn(item) { + let #(k, v) = item + #(birl.to_iso8601(k), momentary_power(v)) + }), + ), + ), + ]) +} + +pub fn module_dataset(dataset: types.ModuleDataset) { + json.object([ + #( + "times", + dataset.times + |> red_black_tree_set.foldr([], fn(acc, time) { + [birl.to_iso8601(time), ..acc] + }) + |> json.array(json.string), + ), + #( + "datas", + json.object( + dataset.datas + |> dict.to_list() + |> list.map(fn(item) { + let #(k, v) = item + #(k, module_power(v)) + }), + ), + ), + ]) +} + +pub fn dataset(dataset: types.Dataset) { + json.object([ + #("production_info", production_info(dataset.production_info)), + #("modules", module_dataset(dataset.modules)), + ]) +} diff --git a/backend/src/aurinko_backend.gleam b/backend/src/aurinko_backend.gleam index bceb8d2..954e709 100644 --- a/backend/src/aurinko_backend.gleam +++ b/backend/src/aurinko_backend.gleam @@ -1,7 +1,6 @@ -import aurinko/pubsub as generic_pubsub -import aurinko/registry import aurinko/updater import aurinko/updater/pubsub +import aurinko/updater/registry import aurinko/web import dot_env import dot_env/env @@ -26,7 +25,7 @@ pub fn main() { |> supervisor.add( supervisor.worker(fn(_) { pubsub.init() }) |> supervisor.returning(fn(_, updater_pubsub_subject) { - generic_pubsub.from_subject(updater_pubsub_subject) + pubsub.from_subject(updater_pubsub_subject) }), ) |> supervisor.add(