kademlia-dhtl/index.js

604 lines
14 KiB
JavaScript

const express = require("express");
const crypto = require("crypto");
const axios = require("axios");
const PORT = parseInt(process.env.PORT);
if (isNaN(PORT) || PORT <= 0 || PORT > 10000) {
console.log("PORT env variable is missing or invalid");
process.exit(1);
}
const HOST = process.env.HOST;
if (typeof HOST !== "string" || !HOST.trim()) {
console.log("HOST env variable missing");
process.exit(1);
}
const BOOTSTRAP = (() => {
try {
if (!process.env.BOOTSTRAP) return [];
return process.env.BOOTSTRAP.split(",").map((s) => s.trim());
} catch (_) {
return [];
}
})();
const KEYSIZE = 160n;
const BUCKETSIZE = 10;
const TTL_MINUTES = 15;
const ALPHA_VALUE = 10;
const REQ_TIMEOUT_MS = 3000;
const ALIVE_TIMEOUT_MS = 1000;
const MAX_VALUE_LENGTH = 2048;
const bucketRanges = [];
for (let i = 1n; i < KEYSIZE + 1n; i++) {
const curr = 2n ** (i - 1n);
const next = 2n ** i;
bucketRanges.push([curr, next]);
}
function assert(cond) {
if (!cond) throw new Error("Assertion failed!");
}
/**
* Chooses `count` many random elements from `arr`;
* Does not need to randomize element order
* @template {any} T
* @param {T[]} arr
* @param {number} count
* @returns {T[]}
*/
function chooseRandom(arr, count) {
assert(Array.isArray(arr));
assert(typeof count === "number");
arr = [...arr];
count = count > arr.length ? arr.length : count;
if (count === arr.length) return arr;
const res = [];
while (res.length < count) {
const index = Math.floor(Math.random() * arr.length);
res.push(arr[index]);
arr.splice(index, 1);
}
return res;
}
class LocalNode {
/** @type {bigint} @readonly */
id;
/** @private @type {RemoteNode[][]} @readonly */
buckets;
/** @private @type {Map<bigint, [Date, Uint8Array][]>} @readonly */
storage;
/** @private @type {boolean} */
isGarbageCollectingBuckets = false;
/**
* @param {bigint} id
*/
constructor(id) {
assert(typeof id === "bigint");
this.id = id;
this.buckets = Array(bucketRanges.length)
.fill(0)
.map(() => []);
this.storage = new Map();
}
/**
* @param {bigint} key
* @param {Uint8Array} value
* @returns {void}
*/
put(key, value) {
assert(typeof key === "bigint" && value instanceof Uint8Array);
assert(value.byteLength <= MAX_VALUE_LENGTH);
const entries = this.storage.get(key) ?? [];
entries.push([new Date(), value]);
this.storage.set(key, entries);
}
/**
* @param {bigint} key
* @param {number | null} limit How many entries shall be returned at max
* @returns {[Date, Uint8Array][]} The entries sorted newest first
*/
get(key, limit) {
assert(typeof key === "bigint");
assert(typeof limit === "number" || limit === null);
let entries = this.storage.get(key) ?? [];
entries = entries.sort(([a], [b]) => b.valueOf() - a.valueOf());
if (limit === null) return entries;
limit = Math.floor(limit);
return entries.slice(0, limit);
}
/**
* @param {bigint} key The key hash
* @param {number | null} limit How many nodes shall be returned at max
* @returns {RemoteNode[]} The nodes ordered closest first (relative to `key`)
*/
findClosestRemoteNodes(key, limit) {
assert(typeof key === "bigint");
assert(typeof limit == "number" || limit === null);
const nodes = this.allRemoteNodes().sort(
(a, b) => (a.id ^ key) - (b.id ^ key),
);
if (limit === null) return nodes;
limit = Math.floor(limit);
return nodes.slice(0, limit);
}
/**
* @returns {RemoteNode[]} All remote nodes in undefined order
*/
allRemoteNodes() {
return this.buckets.flat();
}
/**
* @param {RemoteNode} remoteNode The node to add
* @returns {void}
*/
addRemoteNode(remoteNode) {
assert(remoteNode instanceof RemoteNode);
const bucket = this._bucketFor(remoteNode.id);
if (bucket === null || bucket.length >= BUCKETSIZE) return;
if (bucket.some((node) => node.id === remoteNode.id)) return;
bucket.push(remoteNode);
}
/**
* Clears `buckets` from all `RemoteNode`s that aren't alive anymore
* @returns {Promise<void>}
*/
async garbageCollectBuckets() {
if (this.isGarbageCollectingBuckets) return;
try {
this.isGarbageCollectingBuckets = true;
const allRemoteNodes = this.allRemoteNodes();
const toRemove = [];
for (let i = 0; i < allRemoteNodes.length; i += ALPHA_VALUE) {
const nodes = Array(ALPHA_VALUE)
.fill(0)
.map((_, j) => allRemoteNodes[i + j])
.filter((node) => node);
const alives = await Promise.all(nodes.map((node) => node.isAlive()));
nodes.forEach((node, i) => {
if (!alives[i]) toRemove.push(node);
});
}
toRemove.forEach((node) => console.log(`[gc] removing dead ${node}`));
const toRemoveIds = new Set(toRemove.map((node) => node.id));
this.buckets.forEach((bucket, i) => {
this.buckets[i] = bucket.filter((node) => !toRemoveIds.has(node.id));
});
} finally {
this.isGarbageCollectingBuckets = false;
}
}
/**
* Removes all entries that have expired
* @returns {void}
*/
garbageCollectEntries() {
const now = new Date();
[...this.storage.entries()].forEach(([key, entries]) => {
entries = entries.filter(([date]) => {
return now.getTime() - date.getTime() <= TTL_MINUTES * 60 * 1000;
});
if (entries.length) {
this.storage.set(key, entries);
} else {
this.storage.delete(key);
}
});
}
/**
* Bootstrap the node by asking for existing nodes
* @param {string[]} hosts The hosts/nodes to ask
* @returns {Promise<void>}
*/
async bootstrap(hosts) {
for (let i = 0; i < hosts.length; i += ALPHA_VALUE) {
const currHosts = Array(ALPHA_VALUE)
.fill(0)
.map((_, i) => hosts.at(i))
.filter((host) => host);
await Promise.all(
currHosts.map(async (host) => {
console.log("[bootstrap] checking", host);
const node = await RemoteNode.fromHost(host);
if (!node) return;
console.log(`[bootstrap] adding ${node}`);
this.addRemoteNode(node);
try {
await node.find().then(async (jsonNodes) => {
if (!jsonNodes) return;
const randomJsonNodes = chooseRandom(jsonNodes, 30);
await Promise.all(
randomJsonNodes.map(async ([id, host]) => {
try {
id = BigInt("0x" + id);
if (id === this.id) return;
const node = await RemoteNode.fromHost(host);
if (node.id !== id) return;
console.log(`[bootstrap] adding ${node}`);
this.addRemoteNode(node);
} catch (_) {}
}),
);
});
} catch (_) {}
node.notify(HOST);
}),
);
}
}
/**
* @returns {express.Express}
*/
createServer() {
const self = this;
const server = express();
server.get("/ping", (req, res) => {
const key = self.id.toString(16).padStart(40, "0");
res.status(242).send(key).end();
});
server.get("/get/:key", (req, res) => {
let key;
let limit = null;
try {
const keyHex = req.params.key;
assert(keyHex.length === 40);
key = BigInt("0x" + keyHex);
if (req.query.limit) {
limit = parseInt(req.query.limit);
assert(!isNan(limit));
}
} catch (_) {
res.status(400).end();
return;
}
console.log(`[/get/:key] key = ${key.toString(16)}`);
const data = self.get(key, limit).map(([date, data]) => {
return [date.toUTCString(), Buffer.from(data).toString("base64")];
});
res.status(200).json(data).end();
});
server.post("/put/:key", async (req, res) => {
let key;
let data;
try {
const keyHex = req.params.key;
assert(keyHex.length === 40);
key = BigInt("0x" + keyHex);
data = await new Promise((res, rej) => {
const chunks = [];
req.on("data", (chunk) => chunks.push(chunk));
req.on("end", () => {
try {
const b64Text = Buffer.concat(chunks).toString("ascii").trim();
res(new Uint8Array(Buffer.from(b64Text, "base64")));
} catch (_) {
rej(0);
}
});
req.on("error", () => rej(0));
req.on("close", () => rej(0));
setTimeout(() => rej(0), REQ_TIMEOUT_MS);
});
assert(data instanceof Uint8Array);
assert(data.byteLength <= MAX_VALUE_LENGTH);
} catch (_) {
res.status(400).end();
return;
}
try {
console.log(`[/put/:key] key = ${key.toString(16)}`);
self.put(key, data);
res.status(200).end();
} catch (_) {
res.status(500).end();
}
});
server.get("/find", (req, res) => {
const data = self.allRemoteNodes().map((node) => {
const id = node.id.toString(16).padStart(40, "0");
return [id, node.host];
});
res.status(200).json(data).end();
});
server.get("/find/:key", (req, res) => {
let key;
let limit = null;
try {
const keyHex = req.params.key;
assert(keyHex.length === 40);
key = BigInt("0x" + keyHex);
if (req.query.limit) {
limit = parseInt(req.query.limit);
assert(!isNaN(limit));
}
} catch (_) {
res.status(400).end();
}
console.log(`[/find/:key] key = ${key.toString(16)}`);
const data = self.findClosestRemoteNodes(key, limit).map((node) => {
const id = node.id.toString(16).padStart(40, "0");
return [id, node.host];
});
res.status(200).json(data).end();
});
server.post("/notify", async (req, res) => {
try {
const host = await new Promise((res, rej) => {
const chunks = [];
req.on("data", (chunk) => chunks.push(chunk));
req.on("end", () => {
try {
res(Buffer.concat(chunks).toString("ascii").trim());
} catch (_) {
rej(0);
}
});
req.on("error", () => rej(0));
req.on("close", () => rej(0));
setTimeout(() => rej(0), REQ_TIMEOUT_MS);
});
console.log("[/notify] checking", host);
const node = await RemoteNode.fromHost(host);
if (node) {
console.log(`[/notify] adding ${node}`);
this.addRemoteNode(node);
}
} catch (_) {}
// Always signal success because we don't care
res.status(200).end();
});
server.use((err, req, res, next) => {
console.error(err);
res.status(500).end();
});
return server;
}
/**
* @param {bigint} id
* @returns {boolean}
*/
_knows(id) {
const bucket = this._bucketFor(id);
return bucket.some((node) => node.id === id);
}
/**
* @private
* @param {bigint} key
* @returns {RemoteNode[] | null}
*/
_bucketFor(key) {
assert(typeof key === "bigint");
const d = this.id ^ key;
const bucketIndex = bucketRanges.findIndex(
([start, end]) => d >= start && d < end,
);
if (bucketIndex < 0) return null;
return this.buckets.at(bucketIndex) ?? null;
}
}
class RemoteNode {
/** @type {bigint} @readonly */
id;
/** @type {string} @readonly */
host;
/**
* @param {bigint} id
* @param {string} host
*/
constructor(id, host) {
assert(typeof id === "bigint" && typeof host === "string");
while (host.length && host.endsWith("/")) {
host = host.substring(0, host.length - 1);
}
this.id = id;
this.host = host;
}
/**
* @returns {Promise<boolean>}
*/
async isAlive() {
try {
const res = await axios({
method: "get",
url: `${this.host}/ping`,
timeout: ALIVE_TIMEOUT_MS,
});
if (res.status !== 242) return false;
if (typeof res.data !== "string") return false;
if (res.data.trim().length !== 40) return false;
const id = BigInt("0x" + res.data.trim());
return id === this.id;
} catch (_) {
return false;
}
}
/**
* Notifies the remote node about the given `host`
* @param {string} host
* @returns {Promise<void>}
*/
async notify(host) {
await axios({
method: "post",
url: `${this.host}/notify`,
data: host,
headers: {
"Content-Type": "text/plain",
},
timeout: 5000,
}).catch(() => {});
}
/**
* @returns {Promise<[string, string][] | null>} The parsed JSON result; `null` on error
*/
async find() {
try {
const res = await axios({
method: "get",
url: `${this.host}/find`,
timeout: 3000,
});
assert(res.status === 200);
assert(Array.isArray(res.data));
res.data.forEach((e) => {
assert(Array.isArray(e));
assert(e.length === 2);
assert(typeof e[0] === "string");
assert(typeof e[1] === "string");
});
return res.data;
} catch (_) {
return null;
}
}
toString() {
return `${this.host} (${this.id.toString(16)})`;
}
/**
* @param {string} host
* @returns {Promise<RemoteNode | null>}
*/
static async fromHost(host) {
try {
while (host.length && host.endsWith("/")) {
host = host.substring(0, host.length - 1);
}
const res = await axios({
method: "get",
url: `${host}/ping`,
timeout: 2000,
});
if (res.status !== 242) return null;
if (typeof res.data !== "string") return null;
if (res.data.trim().length !== 40) return null;
const id = BigInt("0x" + res.data.trim());
return new RemoteNode(id, host);
} catch (e) {
console.error(e);
return null;
}
}
}
const id = BigInt(
"0x" + crypto.createHash("sha1").update(crypto.randomBytes(20)).digest("hex"),
);
const node = new LocalNode(id);
(async () => {
while (true) {
node.garbageCollectEntries();
await new Promise((res) => setTimeout(res, 5000));
}
})();
(async () => {
while (true) {
await node.garbageCollectBuckets();
await new Promise((res) => setTimeout(res, 5000));
}
})();
node.createServer().listen(PORT, () => {
console.log("[server] listening on port", PORT);
node.bootstrap(BOOTSTRAP);
});