valproxy/index.js

360 lines
7.8 KiB
JavaScript

import { Socket, createSocket } from "node:dgram";
import { argv, exit, env, hrtime } from "node:process";
/**
* Time a socket can be idle before it's cleaned up.
*
* In milliseconds.
*
* @type {BigInt}
*/
const IDLE_TIMEOUT = 2n * 60n * 1000n;
/**
* How often to check if a socket has passed {@link IDLE_TIMEOUT}.
*
* In milliseconds.
*/
const IDLE_CHECK_INTERVAL = 10 * 1000;
/**
* Time the proxy should stay up when having no connections. After this time, the proxy will automatically shut down.
*
* In milliseconds.
*/
const TOTAL_IDLE_TIMEOUT = 2 * 60 * 1000;
/**
* How quickly to reconnect when an error occurs.
*
* In milliseconds.
*/
const RECONNECT_TIMEOUT = 1 * 1000;
/**
* The socket type to use.
*
* @type {"udp4" | "udp6"}
*/
const SOCKET_TYPE = "udp4";
/**
* File descriptor given from Systemd to bind to.
*/
const SYSTEMD_READ_SOCKET_FD = 3;
/** A network target. */
class Target {
/** @type {string} */
address;
/** @type {number} */
port;
/**
* @param {string} address
* @param {number} port
*/
constructor(address, port) {
this.address = address;
this.port = port;
Object.freeze(this);
}
}
/**
* A convenience over a 2-dimensional map.
*
* @template T
*/
class TargetMap extends Map {
/**
* @param {Target} target
* @param {T} value
*/
set(target, value) {
let portMap = super.get(target.address);
if (!portMap) {
portMap = new Map();
super.set(target.address, portMap);
}
portMap.set(target.port, value);
}
/**
* @param {Target} target
*/
get(target) {
return super.get(target.address)?.get(target.port);
}
/**
* @param {Target} target
*/
delete(target) {
const portMap = super.get(target.address);
if (portMap) {
portMap.delete(target.port);
if (portMap.size === 0) {
super.delete(target.address);
}
}
}
}
/**
* A connection from a remote to our target.
*/
class Connection {
/**
* The remote client address information.
* @type {Target}
*/
source;
/**
* The socket used to connect to the target, and to receive messages from it.
* @type {Socket | null}
*/
writeSocket;
/** @type {NodeJS.Timer} */
idleInterval;
/** @type {() => void} */
onIdleTimeout;
/** @type {(Buffer) => void} */
onMessage;
/** @type {NodeJS.Timer | null} */
reconnectTimeout = null;
connected = false;
/** @type {BigInt} */
lastActivity;
/**
* @param {Target} source
* @param {Socket} writeSocket
* @param {() => void} idleTimeoutCallback
* @param {(Buffer) => void} onMessage
*/
constructor(source, writeSocket, idleTimeoutCallback, onMessage) {
this.source = source;
this.writeSocket = writeSocket;
this.onIdleTimeout = idleTimeoutCallback;
this.onMessage = onMessage;
this.lastActivity = hrtime.bigint();
this.idleInterval = setInterval(() => {
const now = hrtime.bigint();
const diff = (now - this.lastActivity) / 1_000_000n;
if (diff > IDLE_TIMEOUT) {
clearInterval(this.idleInterval);
this.onIdleTimeout();
}
}, IDLE_CHECK_INTERVAL);
}
/**
* Initialise {@link writeSocket} by setting up the event handlers and connecting it to target.
*/
initTargetSocket() {
this.connected = false;
clearTimeout(this.reconnectTimeout);
this.writeSocket.on("connect", () => {
this.connected = true;
this.lastActivity = hrtime.bigint();
console.log(
"Opened new connection to target from",
this.source.address,
this.source.port
);
});
this.writeSocket.on("message", (fromMsg) => {
this.lastActivity = hrtime.bigint();
if (DEBUG) {
console.log(
"Got",
fromMsg.byteLength,
"bytes from target for",
this.source.address,
this.source.port
);
}
this.onMessage(fromMsg);
});
this.writeSocket.on("error", (err) => {
this.#writeError(err);
});
this.writeSocket.on("close", () => {
console.log(
"Socket for",
this.source.address,
this.source.port,
"closed."
);
this.writeSocket = null;
this.connected = false;
});
this.writeSocket.connect(TARGET_PORT, TARGET_ADDRESS);
}
/**
* Send a message to target using our socket.
*
* If the socket is down, the message is discarded.
*
* @param {Buffer} msg
*/
send(msg) {
this.lastActivity = hrtime.bigint();
if (!this.writeSocket || !this.connected) {
return;
}
this.writeSocket.send(msg, (err) => {
if (err) {
this.#writeError(err);
}
});
}
/**
* @param {Error} err
*/
#writeError(err) {
console.error(
"Target socket error for",
this.source.address,
this.source.port
);
console.error(err);
this.writeSocket = null;
this.reconnectTimeout = setTimeout(() => {
this.writeSocket = createSocket(SOCKET_TYPE);
this.initTargetSocket();
}, RECONNECT_TIMEOUT);
}
}
function logConnections() {
let connections = 0;
for (const ports of CONNECTIONS.values()) {
connections += ports.size;
}
console.log("Now hosting", connections, "connections.");
}
function startTotalIdleTimer() {
clearTimeout(TOTAL_IDLE_TIMER);
TOTAL_IDLE_TIMER = setTimeout(() => {
console.log(
"Idle with no connections for",
TOTAL_IDLE_TIMEOUT / 1000,
"seconds, shutting down."
);
exit(0);
}, TOTAL_IDLE_TIMEOUT);
}
const TARGET_ADDRESS = argv[2];
const TARGET_PORT = parseInt(argv[3], 10);
const DEBUG = env.DEBUG === "true";
if (!TARGET_ADDRESS || !TARGET_PORT || Number.isNaN(TARGET_PORT)) {
throw new Error("Must specify target address and port as arguments.");
}
/** @type {TargetMap<Connection>} */
const CONNECTIONS = new TargetMap();
/** @type {NodeJS.Timer | null} */
let TOTAL_IDLE_TIMER = null;
const READ_SOCKET = createSocket(SOCKET_TYPE);
startTotalIdleTimer();
READ_SOCKET.on("listening", () => {
console.log(
"Socket listening on",
READ_SOCKET.address().address,
READ_SOCKET.address().port
);
});
READ_SOCKET.on("message", (toMsg, rinfo) => {
// We need to strip off the dual stack mapped address prefix, if one exists. Otherwise our replies will not be
// delivered to the origin.
let raddress = rinfo.address;
if (raddress.indexOf("::ffff:") === 0) {
raddress = raddress.substring(7);
}
if (DEBUG) {
console.log("Got", toMsg.byteLength, "bytes from", raddress, rinfo.port);
}
const source = new Target(raddress, rinfo.port);
let connection = CONNECTIONS.get(source);
// Create a new connection for each new client address/port pair. This allows us to route the replies from the target
// back to the correct client address/port and thus the correct remote client.
if (!connection) {
clearTimeout(TOTAL_IDLE_TIMER);
const socket = createSocket(SOCKET_TYPE);
connection = new Connection(
source,
socket,
() => {
console.log("Idle timeout for", source.address, source.port);
CONNECTIONS.delete(source);
logConnections();
if (CONNECTIONS.size === 0) {
startTotalIdleTimer();
}
},
(fromMsg) => {
READ_SOCKET.send(fromMsg, source.port, source.address);
}
);
connection.initTargetSocket();
CONNECTIONS.set(source, connection);
logConnections();
}
connection.send(toMsg);
});
READ_SOCKET.on("error", (err) => {
console.error("Systemd read socket error!");
console.error(err);
});
READ_SOCKET.on("close", () => {
console.log("Systemd read socket closed.");
});
// Bind the read socket to the file descriptor given by Systemd. Systemd is already listening to packets for us, so we
// don't open our own socket.
READ_SOCKET.bind({
fd: SYSTEMD_READ_SOCKET_FD,
});