Supervise everything

This commit is contained in:
Mikko Ahlroth 2024-07-06 11:02:43 +03:00
parent fede52dd60
commit f8b6e1af9e
6 changed files with 120 additions and 45 deletions

View file

@ -0,0 +1,25 @@
import gleam/dynamic
import gleam/erlang/process
import gleam/otp/actor
import gleam/result
import glubsub
pub type PubSubSubject(msg) =
process.Subject(glubsub.Message(msg))
pub type PubSub(msg) =
glubsub.Topic(msg)
pub fn init() -> actor.StartResult(glubsub.Message(msg)) {
use topic <- result.try(
glubsub.new_topic()
|> result.map_error(fn(err) { actor.InitCrashed(dynamic.from(err)) }),
)
let glubsub.Topic(subject) = topic
Ok(subject)
}
pub fn from_subject(subject: PubSubSubject(msg)) {
glubsub.Topic(subject)
}

View file

@ -0,0 +1,33 @@
import chip
import gleam/erlang/process
import gleam/otp/actor
import gleam/otp/supervisor
pub type Id {
Updater
}
pub type Store(message) =
chip.Registry(message, Id, Nil)
pub fn start() -> Result(Store(message), actor.StartError) {
chip.start()
}
pub fn childspec() {
supervisor.worker(fn(_index) { start() })
}
pub fn register(
store: Store(message),
id: Id,
subject: process.Subject(message),
) {
chip.new(subject)
|> chip.tag(id)
|> chip.register(store, _)
}
pub fn get(store: Store(message), id: Id) {
chip.find(store, id)
}

View file

@ -1,5 +1,6 @@
import ap_systems/api import ap_systems/api
import ap_systems/module_power import ap_systems/module_power
import aurinko/registry
import aurinko/updater/pubsub import aurinko/updater/pubsub
import aurinko/updater/types.{ import aurinko/updater/types.{
type Dataset, type ModuleDataset, Dataset, ModuleDataset, ModulePower, type Dataset, type ModuleDataset, Dataset, ModuleDataset, ModulePower,
@ -19,7 +20,7 @@ import gleam/result
import gleamy/red_black_tree_set.{type Set} import gleamy/red_black_tree_set.{type Set}
import glubsub import glubsub
const update_interval = 10_000 const update_interval = 30_000
const login_cookie_expiry = 86_400 const login_cookie_expiry = 86_400
@ -76,7 +77,12 @@ pub fn get_data(subject: process.Subject(Message)) {
process.call(subject, fn(reply_subject) { GetData(reply_subject) }, 5000) process.call(subject, fn(reply_subject) { GetData(reply_subject) }, 5000)
} }
pub fn start(username: String, password: String, pubsub: pubsub.UpdaterPubSub) { pub fn start(
username: String,
password: String,
pubsub: pubsub.UpdaterPubSub,
updater_registry: registry.Store(Message),
) {
let spec = let spec =
actor.Spec( actor.Spec(
init: fn() { init: fn() {
@ -84,6 +90,8 @@ pub fn start(username: String, password: String, pubsub: pubsub.UpdaterPubSub) {
let selector = let selector =
process.new_selector() |> process.selecting(self_subject, fn(a) { a }) process.new_selector() |> process.selecting(self_subject, fn(a) { a })
registry.register(updater_registry, registry.Updater, self_subject)
process.send_after(self_subject, 0, PeriodicUpdate)
process.send_after(self_subject, update_interval, PeriodicUpdate) process.send_after(self_subject, update_interval, PeriodicUpdate)
actor.Ready( actor.Ready(
@ -125,7 +133,7 @@ fn handle_message(message: Message, state: State) -> actor.Next(Message, State)
let _ = case new_state.dataset { let _ = case new_state.dataset {
option.Some(dataset) -> option.Some(dataset) ->
glubsub.broadcast(state.pubsub.out, pubsub.NewData(dataset)) glubsub.broadcast(state.pubsub, pubsub.NewData(dataset))
option.None -> Ok(Nil) option.None -> Ok(Nil)
} }

View file

@ -1,16 +1,15 @@
import aurinko/pubsub as generic_pubsub
import aurinko/updater/types import aurinko/updater/types
import gleam/result import gleam/otp/actor
import glubsub import glubsub
pub type OutMessage { pub type OutMessage {
NewData(dataset: types.Dataset) NewData(dataset: types.Dataset)
} }
pub type UpdaterPubSub { pub type UpdaterPubSub =
UpdaterPubSub(out: glubsub.Topic(OutMessage)) generic_pubsub.PubSub(OutMessage)
}
pub fn init() { pub fn init() -> actor.StartResult(glubsub.Message(OutMessage)) {
use out <- result.try(glubsub.new_topic()) generic_pubsub.init()
Ok(UpdaterPubSub(out))
} }

View file

@ -1,5 +1,10 @@
import aurinko/registry
import aurinko/updater
import aurinko/updater/pubsub import aurinko/updater/pubsub
import aurinko/web/router import aurinko/web/router
import gleam/dynamic
import gleam/otp/actor
import gleam/result
import mist import mist
import wisp import wisp
@ -7,12 +12,15 @@ pub fn init(
port: Int, port: Int,
secret_key_base: String, secret_key_base: String,
_updater_pubsub: pubsub.UpdaterPubSub, _updater_pubsub: pubsub.UpdaterPubSub,
_updater_registry: registry.Store(updater.Message),
) { ) {
wisp.configure_logger() wisp.configure_logger()
let assert Ok(_) = wisp.mist_handler(router.handle_request, secret_key_base)
wisp.mist_handler(router.handle_request, secret_key_base) |> mist.new()
|> mist.new() |> mist.port(port)
|> mist.port(port) |> mist.start_http()
|> mist.start_http() |> result.map_error(fn(glisten_error) {
actor.InitCrashed(dynamic.from(glisten_error))
})
} }

View file

@ -1,5 +1,5 @@
import ap_systems/api import aurinko/pubsub as generic_pubsub
import ap_systems/module_power import aurinko/registry
import aurinko/updater import aurinko/updater
import aurinko/updater/pubsub import aurinko/updater/pubsub
import aurinko/web import aurinko/web
@ -7,10 +7,7 @@ import dot_env
import dot_env/env import dot_env/env
import gleam/erlang/process import gleam/erlang/process
import gleam/int import gleam/int
import gleam/io import gleam/otp/supervisor
import gleam/result
import gleam/string
import glubsub
pub fn main() { pub fn main() {
dot_env.new_with_path("./.env") dot_env.new_with_path("./.env")
@ -23,29 +20,34 @@ pub fn main() {
let assert Ok(port) = int.parse(port) let assert Ok(port) = int.parse(port)
let assert Ok(secret_key_base) = env.get("SECRET_KEY_BASE") let assert Ok(secret_key_base) = env.get("SECRET_KEY_BASE")
let assert Ok(updater_pubsub) = pubsub.init() let assert Ok(_supervisor) =
let assert Ok(subject) = updater.start(username, password, updater_pubsub) supervisor.start(fn(children) {
let assert Ok(_) = web.init(port, secret_key_base, updater_pubsub) children
io.debug(subject) |> supervisor.add(
process.sleep(15_000) supervisor.worker(fn(_) { pubsub.init() })
io.debug(updater.get_data(subject)) |> supervisor.returning(fn(_, updater_pubsub_subject) {
generic_pubsub.from_subject(updater_pubsub_subject)
}),
)
|> supervisor.add(
supervisor.worker(fn(_) { registry.start() })
|> supervisor.returning(fn(updater_pubsub, registry) {
#(updater_pubsub, registry)
}),
)
|> supervisor.add(
supervisor.worker(fn(params) {
let #(updater_pubsub, updater_registry) = params
updater.start(username, password, updater_pubsub, updater_registry)
}),
)
|> supervisor.add(
supervisor.worker(fn(params) {
let #(updater_pubsub, updater_registry) = params
web.init(port, secret_key_base, updater_pubsub, updater_registry)
}),
)
})
process.sleep_forever() process.sleep_forever()
// use cookies <- result.try(
// api.login(username, password) |> result.map_error(string.inspect),
// )
// io.debug(cookies)
// use prod <- result.try(
// api.get_production(cookies) |> result.map_error(string.inspect),
// )
// io.debug(prod)
// use ids <- result.try(
// io.debug(api.get_module_ids(cookies)) |> result.map_error(string.inspect),
// )
// use module_data <- result.try(
// api.get_module_power(cookies, ids) |> result.map_error(string.inspect),
// )
// io.debug(module_power.decode_api_data(module_data))
// |> result.map_error(string.inspect)
} }