const express = require("express"); const crypto = require("crypto"); const axios = require("axios"); const PORT = parseInt(process.env.PORT); if (isNaN(PORT) || PORT <= 0 || PORT > 10000) { console.log("PORT env variable is missing or invalid"); process.exit(1); } const HOST = process.env.HOST; if (typeof HOST !== "string" || !HOST.trim()) { console.log("HOST env variable missing"); process.exit(1); } const BOOTSTRAP = (() => { try { if (!process.env.BOOTSTRAP) return []; return process.env.BOOTSTRAP.split(",").map((s) => s.trim()); } catch (_) { return []; } })(); const KEYSIZE = 160n; const BUCKETSIZE = 10; const TTL_MINUTES = 15; const ALPHA_VALUE = 10; const REQ_TIMEOUT_MS = 3000; const ALIVE_TIMEOUT_MS = 1000; const MAX_VALUE_LENGTH = 2048; const bucketRanges = []; for (let i = 1n; i < KEYSIZE + 1n; i++) { const curr = 2n ** (i - 1n); const next = 2n ** i; bucketRanges.push([curr, next]); } function assert(cond) { if (!cond) throw new Error("Assertion failed!"); } /** * Chooses `count` many random elements from `arr`; * Does not need to randomize element order * @template {any} T * @param {T[]} arr * @param {number} count * @returns {T[]} */ function chooseRandom(arr, count) { assert(Array.isArray(arr)); assert(typeof count === "number"); arr = [...arr]; count = count > arr.length ? arr.length : count; if (count === arr.length) return arr; const res = []; while (res.length < count) { const index = Math.floor(Math.random() * arr.length); res.push(arr[index]); arr.splice(index, 1); } return res; } class LocalNode { /** @type {bigint} @readonly */ id; /** @private @type {RemoteNode[][]} @readonly */ buckets; /** @private @type {Map} @readonly */ storage; /** @private @type {boolean} */ isGarbageCollectingBuckets = false; /** * @param {bigint} id */ constructor(id) { assert(typeof id === "bigint"); this.id = id; this.buckets = Array(bucketRanges.length) .fill(0) .map(() => []); this.storage = new Map(); } /** * @param {bigint} key * @param {Uint8Array} value * @returns {void} */ put(key, value) { assert(typeof key === "bigint" && value instanceof Uint8Array); assert(value.byteLength <= MAX_VALUE_LENGTH); const entries = this.storage.get(key) ?? []; entries.push([new Date(), value]); this.storage.set(key, entries); } /** * @param {bigint} key * @param {number | null} limit How many entries shall be returned at max * @returns {[Date, Uint8Array][]} The entries sorted newest first */ get(key, limit) { assert(typeof key === "bigint"); assert(typeof limit === "number" || limit === null); let entries = this.storage.get(key) ?? []; entries = entries.sort(([a], [b]) => b.valueOf() - a.valueOf()); if (limit === null) return entries; limit = Math.floor(limit); return entries.slice(0, limit); } /** * @param {bigint} key The key hash * @param {number | null} limit How many nodes shall be returned at max * @returns {RemoteNode[]} The nodes ordered closest first (relative to `key`) */ findClosestRemoteNodes(key, limit) { assert(typeof key === "bigint"); assert(typeof limit == "number" || limit === null); const nodes = this.allRemoteNodes().sort( (a, b) => (a.id ^ key) - (b.id ^ key), ); if (limit === null) return nodes; limit = Math.floor(limit); return nodes.slice(0, limit); } /** * @returns {RemoteNode[]} All remote nodes in undefined order */ allRemoteNodes() { return this.buckets.flat(); } /** * @param {RemoteNode} remoteNode The node to add * @returns {void} */ addRemoteNode(remoteNode) { assert(remoteNode instanceof RemoteNode); const bucket = this._bucketFor(remoteNode.id); if (bucket === null || bucket.length >= BUCKETSIZE) return; if (bucket.some((node) => node.id === remoteNode.id)) return; bucket.push(remoteNode); } /** * Clears `buckets` from all `RemoteNode`s that aren't alive anymore * @returns {Promise} */ async garbageCollectBuckets() { if (this.isGarbageCollectingBuckets) return; try { this.isGarbageCollectingBuckets = true; const allRemoteNodes = this.allRemoteNodes(); const toRemove = []; for (let i = 0; i < allRemoteNodes.length; i += ALPHA_VALUE) { const nodes = Array(ALPHA_VALUE) .fill(0) .map((_, j) => allRemoteNodes[i + j]) .filter((node) => node); const alives = await Promise.all(nodes.map((node) => node.isAlive())); nodes.forEach((node, i) => { if (!alives[i]) toRemove.push(node); }); } toRemove.forEach((node) => console.log(`[gc] removing dead ${node}`)); const toRemoveIds = new Set(toRemove.map((node) => node.id)); this.buckets.forEach((bucket, i) => { this.buckets[i] = bucket.filter((node) => !toRemoveIds.has(node.id)); }); } finally { this.isGarbageCollectingBuckets = false; } } /** * Removes all entries that have expired * @returns {void} */ garbageCollectEntries() { const now = new Date(); [...this.storage.entries()].forEach(([key, entries]) => { entries = entries.filter(([date]) => { return now.getTime() - date.getTime() <= TTL_MINUTES * 60 * 1000; }); if (entries.length) { this.storage.set(key, entries); } else { this.storage.delete(key); } }); } /** * Bootstrap the node by asking for existing nodes * @param {string[]} hosts The hosts/nodes to ask * @returns {Promise} */ async bootstrap(hosts) { for (let i = 0; i < hosts.length; i += ALPHA_VALUE) { const currHosts = Array(ALPHA_VALUE) .fill(0) .map((_, i) => hosts.at(i)) .filter((host) => host); await Promise.all( currHosts.map(async (host) => { console.log("[bootstrap] checking", host); const node = await RemoteNode.fromHost(host); if (!node) return; console.log(`[bootstrap] adding ${node}`); this.addRemoteNode(node); try { await node.find().then(async (jsonNodes) => { if (!jsonNodes) return; const randomJsonNodes = chooseRandom(jsonNodes, 30); await Promise.all( randomJsonNodes.map(async ([id, host]) => { try { id = BigInt("0x" + id); if (id === this.id) return; const node = await RemoteNode.fromHost(host); if (node.id !== id) return; console.log(`[bootstrap] adding ${node}`); this.addRemoteNode(node); } catch (_) {} }), ); }); } catch (_) {} node.notify(HOST); }), ); } } /** * @returns {express.Express} */ createServer() { const self = this; const server = express(); server.get("/ping", (req, res) => { const key = self.id.toString(16).padStart(40, "0"); res.status(242).send(key).end(); }); server.get("/get/:key", (req, res) => { let key; let limit = null; try { const keyHex = req.params.key; assert(keyHex.length === 40); key = BigInt("0x" + keyHex); if (req.query.limit) { limit = parseInt(req.query.limit); assert(!isNan(limit)); } } catch (_) { res.status(400).end(); return; } console.log(`[/get/:key] key = ${key.toString(16)}`); const data = self.get(key, limit).map(([date, data]) => { return [date.toUTCString(), Buffer.from(data).toString("base64")]; }); res.status(200).json(data).end(); }); server.post("/put/:key", async (req, res) => { let key; let data; try { const keyHex = req.params.key; assert(keyHex.length === 40); key = BigInt("0x" + keyHex); data = await new Promise((res, rej) => { const chunks = []; req.on("data", (chunk) => chunks.push(chunk)); req.on("end", () => { try { const b64Text = Buffer.concat(chunks).toString("ascii").trim(); res(new Uint8Array(Buffer.from(b64Text, "base64"))); } catch (_) { rej(0); } }); req.on("error", () => rej(0)); req.on("close", () => rej(0)); setTimeout(() => rej(0), REQ_TIMEOUT_MS); }); assert(data instanceof Uint8Array); assert(data.byteLength <= MAX_VALUE_LENGTH); } catch (_) { res.status(400).end(); return; } try { console.log(`[/put/:key] key = ${key.toString(16)}`); self.put(key, data); res.status(200).end(); } catch (_) { res.status(500).end(); } }); server.get("/find", (req, res) => { const data = self.allRemoteNodes().map((node) => { const id = node.id.toString(16).padStart(40, "0"); return [id, node.host]; }); res.status(200).json(data).end(); }); server.get("/find/:key", (req, res) => { let key; let limit = null; try { const keyHex = req.params.key; assert(keyHex.length === 40); key = BigInt("0x" + keyHex); if (req.query.limit) { limit = parseInt(req.query.limit); assert(!isNaN(limit)); } } catch (_) { res.status(400).end(); } console.log(`[/find/:key] key = ${key.toString(16)}`); const data = self.findClosestRemoteNodes(key, limit).map((node) => { const id = node.id.toString(16).padStart(40, "0"); return [id, node.host]; }); res.status(200).json(data).end(); }); server.post("/notify", async (req, res) => { try { const host = await new Promise((res, rej) => { const chunks = []; req.on("data", (chunk) => chunks.push(chunk)); req.on("end", () => { try { res(Buffer.concat(chunks).toString("ascii").trim()); } catch (_) { rej(0); } }); req.on("error", () => rej(0)); req.on("close", () => rej(0)); setTimeout(() => rej(0), REQ_TIMEOUT_MS); }); console.log("[/notify] checking", host); const node = await RemoteNode.fromHost(host); if (node) { console.log(`[/notify] adding ${node}`); this.addRemoteNode(node); } } catch (_) {} // Always signal success because we don't care res.status(200).end(); }); server.use((err, req, res, next) => { console.error(err); res.status(500).end(); }); return server; } /** * @param {bigint} id * @returns {boolean} */ _knows(id) { const bucket = this._bucketFor(id); return bucket.some((node) => node.id === id); } /** * @private * @param {bigint} key * @returns {RemoteNode[] | null} */ _bucketFor(key) { assert(typeof key === "bigint"); const d = this.id ^ key; const bucketIndex = bucketRanges.findIndex( ([start, end]) => d >= start && d < end, ); if (bucketIndex < 0) return null; return this.buckets.at(bucketIndex) ?? null; } } class RemoteNode { /** @type {bigint} @readonly */ id; /** @type {string} @readonly */ host; /** * @param {bigint} id * @param {string} host */ constructor(id, host) { assert(typeof id === "bigint" && typeof host === "string"); while (host.length && host.endsWith("/")) { host = host.substring(0, host.length - 1); } this.id = id; this.host = host; } /** * @returns {Promise} */ async isAlive() { try { const res = await axios({ method: "get", url: `${this.host}/ping`, timeout: ALIVE_TIMEOUT_MS, }); if (res.status !== 242) return false; if (typeof res.data !== "string") return false; if (res.data.trim().length !== 40) return false; const id = BigInt("0x" + res.data.trim()); return id === this.id; } catch (_) { return false; } } /** * Notifies the remote node about the given `host` * @param {string} host * @returns {Promise} */ async notify(host) { await axios({ method: "post", url: `${this.host}/notify`, data: host, headers: { "Content-Type": "text/plain", }, timeout: 5000, }).catch(() => {}); } /** * @returns {Promise<[string, string][] | null>} The parsed JSON result; `null` on error */ async find() { try { const res = await axios({ method: "get", url: `${this.host}/find`, timeout: 3000, }); assert(res.status === 200); assert(Array.isArray(res.data)); res.data.forEach((e) => { assert(Array.isArray(e)); assert(e.length === 2); assert(typeof e[0] === "string"); assert(typeof e[1] === "string"); }); return res.data; } catch (_) { return null; } } toString() { return `${this.host} (${this.id.toString(16)})`; } /** * @param {string} host * @returns {Promise} */ static async fromHost(host) { try { while (host.length && host.endsWith("/")) { host = host.substring(0, host.length - 1); } const res = await axios({ method: "get", url: `${host}/ping`, timeout: 2000, }); if (res.status !== 242) return null; if (typeof res.data !== "string") return null; if (res.data.trim().length !== 40) return null; const id = BigInt("0x" + res.data.trim()); return new RemoteNode(id, host); } catch (e) { console.error(e); return null; } } } const id = BigInt( "0x" + crypto.createHash("sha1").update(crypto.randomBytes(20)).digest("hex"), ); const node = new LocalNode(id); (async () => { while (true) { node.garbageCollectEntries(); await new Promise((res) => setTimeout(res, 5000)); } })(); (async () => { while (true) { await node.garbageCollectBuckets(); await new Promise((res) => setTimeout(res, 5000)); } })(); node.createServer().listen(PORT, () => { console.log("[server] listening on port", PORT); node.bootstrap(BOOTSTRAP); });