diff --git a/backend/src/aurinko/pubsub.gleam b/backend/src/aurinko/pubsub.gleam new file mode 100644 index 0000000..24b11b5 --- /dev/null +++ b/backend/src/aurinko/pubsub.gleam @@ -0,0 +1,25 @@ +import gleam/dynamic +import gleam/erlang/process +import gleam/otp/actor +import gleam/result +import glubsub + +pub type PubSubSubject(msg) = + process.Subject(glubsub.Message(msg)) + +pub type PubSub(msg) = + glubsub.Topic(msg) + +pub fn init() -> actor.StartResult(glubsub.Message(msg)) { + use topic <- result.try( + glubsub.new_topic() + |> result.map_error(fn(err) { actor.InitCrashed(dynamic.from(err)) }), + ) + + let glubsub.Topic(subject) = topic + Ok(subject) +} + +pub fn from_subject(subject: PubSubSubject(msg)) { + glubsub.Topic(subject) +} diff --git a/backend/src/aurinko/registry.gleam b/backend/src/aurinko/registry.gleam new file mode 100644 index 0000000..49dead5 --- /dev/null +++ b/backend/src/aurinko/registry.gleam @@ -0,0 +1,33 @@ +import chip +import gleam/erlang/process +import gleam/otp/actor +import gleam/otp/supervisor + +pub type Id { + Updater +} + +pub type Store(message) = + chip.Registry(message, Id, Nil) + +pub fn start() -> Result(Store(message), actor.StartError) { + chip.start() +} + +pub fn childspec() { + supervisor.worker(fn(_index) { start() }) +} + +pub fn register( + store: Store(message), + id: Id, + subject: process.Subject(message), +) { + chip.new(subject) + |> chip.tag(id) + |> chip.register(store, _) +} + +pub fn get(store: Store(message), id: Id) { + chip.find(store, id) +} diff --git a/backend/src/aurinko/updater.gleam b/backend/src/aurinko/updater.gleam index 7361575..5ef48fb 100644 --- a/backend/src/aurinko/updater.gleam +++ b/backend/src/aurinko/updater.gleam @@ -1,5 +1,6 @@ import ap_systems/api import ap_systems/module_power +import aurinko/registry import aurinko/updater/pubsub import aurinko/updater/types.{ type Dataset, type ModuleDataset, Dataset, ModuleDataset, ModulePower, @@ -19,7 +20,7 @@ import gleam/result import gleamy/red_black_tree_set.{type Set} import glubsub -const update_interval = 10_000 +const update_interval = 30_000 const login_cookie_expiry = 86_400 @@ -76,7 +77,12 @@ pub fn get_data(subject: process.Subject(Message)) { process.call(subject, fn(reply_subject) { GetData(reply_subject) }, 5000) } -pub fn start(username: String, password: String, pubsub: pubsub.UpdaterPubSub) { +pub fn start( + username: String, + password: String, + pubsub: pubsub.UpdaterPubSub, + updater_registry: registry.Store(Message), +) { let spec = actor.Spec( init: fn() { @@ -84,6 +90,8 @@ pub fn start(username: String, password: String, pubsub: pubsub.UpdaterPubSub) { let selector = process.new_selector() |> process.selecting(self_subject, fn(a) { a }) + registry.register(updater_registry, registry.Updater, self_subject) + process.send_after(self_subject, 0, PeriodicUpdate) process.send_after(self_subject, update_interval, PeriodicUpdate) actor.Ready( @@ -125,7 +133,7 @@ fn handle_message(message: Message, state: State) -> actor.Next(Message, State) let _ = case new_state.dataset { option.Some(dataset) -> - glubsub.broadcast(state.pubsub.out, pubsub.NewData(dataset)) + glubsub.broadcast(state.pubsub, pubsub.NewData(dataset)) option.None -> Ok(Nil) } diff --git a/backend/src/aurinko/updater/pubsub.gleam b/backend/src/aurinko/updater/pubsub.gleam index b7ae110..63d6ede 100644 --- a/backend/src/aurinko/updater/pubsub.gleam +++ b/backend/src/aurinko/updater/pubsub.gleam @@ -1,16 +1,15 @@ +import aurinko/pubsub as generic_pubsub import aurinko/updater/types -import gleam/result +import gleam/otp/actor import glubsub pub type OutMessage { NewData(dataset: types.Dataset) } -pub type UpdaterPubSub { - UpdaterPubSub(out: glubsub.Topic(OutMessage)) -} +pub type UpdaterPubSub = + generic_pubsub.PubSub(OutMessage) -pub fn init() { - use out <- result.try(glubsub.new_topic()) - Ok(UpdaterPubSub(out)) +pub fn init() -> actor.StartResult(glubsub.Message(OutMessage)) { + generic_pubsub.init() } diff --git a/backend/src/aurinko/web.gleam b/backend/src/aurinko/web.gleam index 0762feb..7233f2f 100644 --- a/backend/src/aurinko/web.gleam +++ b/backend/src/aurinko/web.gleam @@ -1,5 +1,10 @@ +import aurinko/registry +import aurinko/updater import aurinko/updater/pubsub import aurinko/web/router +import gleam/dynamic +import gleam/otp/actor +import gleam/result import mist import wisp @@ -7,12 +12,15 @@ pub fn init( port: Int, secret_key_base: String, _updater_pubsub: pubsub.UpdaterPubSub, + _updater_registry: registry.Store(updater.Message), ) { wisp.configure_logger() - let assert Ok(_) = - wisp.mist_handler(router.handle_request, secret_key_base) - |> mist.new() - |> mist.port(port) - |> mist.start_http() + wisp.mist_handler(router.handle_request, secret_key_base) + |> mist.new() + |> mist.port(port) + |> mist.start_http() + |> result.map_error(fn(glisten_error) { + actor.InitCrashed(dynamic.from(glisten_error)) + }) } diff --git a/backend/src/aurinko_backend.gleam b/backend/src/aurinko_backend.gleam index 8eb6a13..bceb8d2 100644 --- a/backend/src/aurinko_backend.gleam +++ b/backend/src/aurinko_backend.gleam @@ -1,5 +1,5 @@ -import ap_systems/api -import ap_systems/module_power +import aurinko/pubsub as generic_pubsub +import aurinko/registry import aurinko/updater import aurinko/updater/pubsub import aurinko/web @@ -7,10 +7,7 @@ import dot_env import dot_env/env import gleam/erlang/process import gleam/int -import gleam/io -import gleam/result -import gleam/string -import glubsub +import gleam/otp/supervisor pub fn main() { dot_env.new_with_path("./.env") @@ -23,29 +20,34 @@ pub fn main() { let assert Ok(port) = int.parse(port) let assert Ok(secret_key_base) = env.get("SECRET_KEY_BASE") - let assert Ok(updater_pubsub) = pubsub.init() - let assert Ok(subject) = updater.start(username, password, updater_pubsub) - let assert Ok(_) = web.init(port, secret_key_base, updater_pubsub) - io.debug(subject) - process.sleep(15_000) - io.debug(updater.get_data(subject)) + let assert Ok(_supervisor) = + supervisor.start(fn(children) { + children + |> supervisor.add( + supervisor.worker(fn(_) { pubsub.init() }) + |> supervisor.returning(fn(_, updater_pubsub_subject) { + generic_pubsub.from_subject(updater_pubsub_subject) + }), + ) + |> supervisor.add( + supervisor.worker(fn(_) { registry.start() }) + |> supervisor.returning(fn(updater_pubsub, registry) { + #(updater_pubsub, registry) + }), + ) + |> supervisor.add( + supervisor.worker(fn(params) { + let #(updater_pubsub, updater_registry) = params + updater.start(username, password, updater_pubsub, updater_registry) + }), + ) + |> supervisor.add( + supervisor.worker(fn(params) { + let #(updater_pubsub, updater_registry) = params + web.init(port, secret_key_base, updater_pubsub, updater_registry) + }), + ) + }) + process.sleep_forever() - // use cookies <- result.try( - // api.login(username, password) |> result.map_error(string.inspect), - // ) - - // io.debug(cookies) - // use prod <- result.try( - // api.get_production(cookies) |> result.map_error(string.inspect), - // ) - // io.debug(prod) - - // use ids <- result.try( - // io.debug(api.get_module_ids(cookies)) |> result.map_error(string.inspect), - // ) - // use module_data <- result.try( - // api.get_module_power(cookies, ids) |> result.map_error(string.inspect), - // ) - // io.debug(module_power.decode_api_data(module_data)) - // |> result.map_error(string.inspect) }