valproxy/index.js

307 lines
6.4 KiB
JavaScript

import { Socket, createSocket } from "node:dgram";
import { argv } from "node:process";
/**
* Time a socket can be idle before it's cleaned up.
*
* In milliseconds.
*/
const IDLE_TIMEOUT = 2 * 60 * 1000;
/**
* How often to retry sending a message to the target.
*
* In milliseconds.
*/
const RETRY_INTERVAL = 1 * 1000;
/**
* How long to wait for the target to be reachable.
*
* Note that due to the nature of UDP, all reachability means is that the send call did not return an error.
*
* In milliseconds.
*/
const WRITE_TIMEOUT = 5 * 60 * 1000;
/**
* The socket type to use.
*
* @type {"udp4" | "udp6"}
*/
const SOCKET_TYPE = "udp4";
/**
* File descriptor given from Systemd to bind to.
*/
const SYSTEMD_READ_SOCKET_FD = 3;
class Target {
/** @type {string} */
address;
/** @type {number} */
port;
/**
* @param {string} address
* @param {number} port
*/
constructor(address, port) {
this.address = address;
this.port = port;
Object.freeze(this);
}
}
/** @template T */
class TargetMap extends Map {
/**
* @type {Map<string, Map<string, T>>}
*/
#data = new Map();
/**
* @param {Target} target
* @param {T} value
*/
set(target, value) {
let portMap = this.#data.get(target.address);
if (!portMap) {
portMap = new Map();
this.#data.set(target.address, portMap);
}
portMap.set(target.port, value);
}
/**
* @param {Target} target
*/
get(target) {
return this.#data.get(target.address)?.get(target.port);
}
/**
* @param {Target} target
*/
delete(target) {
const portMap = this.#data.get(target.address);
if (portMap) {
portMap.delete(target.port);
if (portMap.size === 0) {
this.#data.delete(target.address);
}
}
}
clear() {
this.#data.clear();
}
}
class Connection {
/** @type {Target} */
source;
/** @type {Socket | null} */
writeSocket;
/** @type {Buffer[]} */
msgBuffer = [];
/** @type {NodeJS.Timer | null} */
idleTimeout = null;
/** @type {() => void} */
idleTimeoutCallback;
/** @type {NodeJS.Timer | null} */
retryTimeout = null;
/** @type {NodeJS.Timer | null} */
reachabilityTimeout = null;
writing = false;
/**
* @param {Target} source
* @param {Socket} writeSocket
* @param {() => void} idleTimeoutCallback
*/
constructor(source, writeSocket, idleTimeoutCallback) {
this.source = source;
this.writeSocket = writeSocket;
this.idleTimeoutCallback = idleTimeoutCallback;
}
initTargetSocket() {
clearTimeout(this.retryTimeout);
this.writeSocket.on("connect", () => {
console.log(
"Opened new connection to target from",
this.source.address,
this.source.port
);
});
this.writeSocket.on("message", (fromMsg) => {
this.#resetIdleTimeout();
console.log(
"Got",
fromMsg.byteLength,
"bytes from target for",
this.source.address,
this.source.port
);
READ_SOCKET.send(fromMsg, this.source.port, this.source.address);
});
this.writeSocket.on("error", (err) => {
this.#writeError(err);
});
this.writeSocket.on("close", () => {
console.log(
"Socket for",
this.source.address,
this.source.port,
"closed."
);
CONNECTIONS.delete(source);
});
this.writeSocket.connect(TARGET_PORT, TARGET_ADDRESS);
this.#resetIdleTimeout();
}
/**
* @param {?Buffer} msg
*/
send(msg) {
this.#resetIdleTimeout();
if (msg && (this.writing || !this.writeSocket)) {
this.msgBuffer.push(msg);
return;
}
/** @type {?Buffer} */
let head;
if (this.msgBuffer.length > 0) {
head = this.msgBuffer.shift();
if (msg) {
this.msgBuffer.push(msg);
}
} else {
head = msg;
}
if (!head) {
return;
}
this.writing = true;
this.writeSocket.send(head, (error) => {
this.writing = false;
if (error) {
this.msgBuffer.unshift(head);
this.#writeError(err);
} else {
clearTimeout(this.retryTimeout);
clearTimeout(this.reachabilityTimeout);
this.retryTimeout = null;
this.reachabilityTimeout = null;
if (this.msgBuffer.length > 0) {
requestIdleCallback(() => {
this.send();
});
}
}
});
}
#resetIdleTimeout() {
clearTimeout(this.idleTimeout);
this.idleTimeout = setTimeout(
() => this.idleTimeoutCallback(),
IDLE_TIMEOUT
);
}
/**
* @param {Error} err
*/
#writeError(err) {
console.error("Target socket error for", source.address, source.port);
console.error(err);
this.writeSocket = null;
this.retryTimeout = setTimeout(() => {
this.writeSocket = createSocket(SOCKET_TYPE);
this.initTargetSocket();
}, RETRY_INTERVAL);
if (!this.reachabilityTimeout) {
this.reachabilityTimeout = setTimeout(() => {
throw new Error(
`Reachability timeout for connection to target from ${source.address} ${source.port}`
);
}, WRITE_TIMEOUT);
}
}
}
const TARGET_ADDRESS = argv[2];
const TARGET_PORT = parseInt(argv[3], 10);
if (!TARGET_ADDRESS || !TARGET_PORT || Number.isNaN(TARGET_PORT)) {
throw new Error("Must specify target address and port as arguments.");
}
/** @type {TargetMap<Connection>} */
const CONNECTIONS = new TargetMap();
const READ_SOCKET = createSocket(SOCKET_TYPE);
READ_SOCKET.on("listening", () => {
console.log(
"Socket listening on",
READ_SOCKET.address().address,
READ_SOCKET.address().port
);
});
READ_SOCKET.on("message", (toMsg, rinfo) => {
console.log("Got", toMsg.byteLength, "bytes from", rinfo.address, rinfo.port);
const source = new Target(rinfo.address, rinfo.port);
let connection = CONNECTIONS.get(source);
if (!connection) {
const socket = createSocket(SOCKET_TYPE);
connection = new Connection(source, socket, () => {
console.log("Idle timeout for", source.address, source.port);
CONNECTIONS.delete(source);
});
connection.initTargetSocket();
CONNECTIONS.set(source, connection);
}
connection.send(toMsg);
});
READ_SOCKET.on("error", (err) => {
console.error("Systemd read socket error!");
console.error(err);
});
READ_SOCKET.on("close", () => {
console.log("Systemd read socket closed.");
});
READ_SOCKET.bind({
fd: SYSTEMD_READ_SOCKET_FD,
});