604 lines
14 KiB
JavaScript
604 lines
14 KiB
JavaScript
const express = require("express");
|
|
const crypto = require("crypto");
|
|
const axios = require("axios");
|
|
|
|
const PORT = parseInt(process.env.PORT);
|
|
if (isNaN(PORT) || PORT <= 0 || PORT > 10000) {
|
|
console.log("PORT env variable is missing or invalid");
|
|
process.exit(1);
|
|
}
|
|
|
|
const HOST = process.env.HOST;
|
|
if (typeof HOST !== "string" || !HOST.trim()) {
|
|
console.log("HOST env variable missing");
|
|
process.exit(1);
|
|
}
|
|
|
|
const BOOTSTRAP = (() => {
|
|
try {
|
|
if (!process.env.BOOTSTRAP) return [];
|
|
return process.env.BOOTSTRAP.split(",").map((s) => s.trim());
|
|
} catch (_) {
|
|
return [];
|
|
}
|
|
})();
|
|
|
|
const KEYSIZE = 160n;
|
|
const BUCKETSIZE = 10;
|
|
const TTL_MINUTES = 15;
|
|
const ALPHA_VALUE = 10;
|
|
const REQ_TIMEOUT_MS = 3000;
|
|
const ALIVE_TIMEOUT_MS = 1000;
|
|
const MAX_VALUE_LENGTH = 2048;
|
|
|
|
const bucketRanges = [];
|
|
for (let i = 1n; i < KEYSIZE + 1n; i++) {
|
|
const curr = 2n ** (i - 1n);
|
|
const next = 2n ** i;
|
|
bucketRanges.push([curr, next]);
|
|
}
|
|
|
|
function assert(cond) {
|
|
if (!cond) throw new Error("Assertion failed!");
|
|
}
|
|
|
|
/**
|
|
* Chooses `count` many random elements from `arr`;
|
|
* Does not need to randomize element order
|
|
* @template {any} T
|
|
* @param {T[]} arr
|
|
* @param {number} count
|
|
* @returns {T[]}
|
|
*/
|
|
function chooseRandom(arr, count) {
|
|
assert(Array.isArray(arr));
|
|
assert(typeof count === "number");
|
|
|
|
arr = [...arr];
|
|
|
|
count = count > arr.length ? arr.length : count;
|
|
if (count === arr.length) return arr;
|
|
|
|
const res = [];
|
|
while (res.length < count) {
|
|
const index = Math.floor(Math.random() * arr.length);
|
|
res.push(arr[index]);
|
|
arr.splice(index, 1);
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
class LocalNode {
|
|
/** @type {bigint} @readonly */
|
|
id;
|
|
/** @private @type {RemoteNode[][]} @readonly */
|
|
buckets;
|
|
/** @private @type {Map<bigint, [Date, Uint8Array][]>} @readonly */
|
|
storage;
|
|
|
|
/** @private @type {boolean} */
|
|
isGarbageCollectingBuckets = false;
|
|
|
|
/**
|
|
* @param {bigint} id
|
|
*/
|
|
constructor(id) {
|
|
assert(typeof id === "bigint");
|
|
|
|
this.id = id;
|
|
this.buckets = Array(bucketRanges.length)
|
|
.fill(0)
|
|
.map(() => []);
|
|
this.storage = new Map();
|
|
}
|
|
|
|
/**
|
|
* @param {bigint} key
|
|
* @param {Uint8Array} value
|
|
* @returns {void}
|
|
*/
|
|
put(key, value) {
|
|
assert(typeof key === "bigint" && value instanceof Uint8Array);
|
|
assert(value.byteLength <= MAX_VALUE_LENGTH);
|
|
|
|
const entries = this.storage.get(key) ?? [];
|
|
entries.push([new Date(), value]);
|
|
this.storage.set(key, entries);
|
|
}
|
|
|
|
/**
|
|
* @param {bigint} key
|
|
* @param {number | null} limit How many entries shall be returned at max
|
|
* @returns {[Date, Uint8Array][]} The entries sorted newest first
|
|
*/
|
|
get(key, limit) {
|
|
assert(typeof key === "bigint");
|
|
assert(typeof limit === "number" || limit === null);
|
|
|
|
let entries = this.storage.get(key) ?? [];
|
|
entries = entries.sort(([a], [b]) => b.valueOf() - a.valueOf());
|
|
if (limit === null) return entries;
|
|
|
|
limit = Math.floor(limit);
|
|
return entries.slice(0, limit);
|
|
}
|
|
|
|
/**
|
|
* @param {bigint} key The key hash
|
|
* @param {number | null} limit How many nodes shall be returned at max
|
|
* @returns {RemoteNode[]} The nodes ordered closest first (relative to `key`)
|
|
*/
|
|
findClosestRemoteNodes(key, limit) {
|
|
assert(typeof key === "bigint");
|
|
assert(typeof limit == "number" || limit === null);
|
|
|
|
const nodes = this.allRemoteNodes().sort(
|
|
(a, b) => (a.id ^ key) - (b.id ^ key),
|
|
);
|
|
|
|
if (limit === null) return nodes;
|
|
|
|
limit = Math.floor(limit);
|
|
return nodes.slice(0, limit);
|
|
}
|
|
|
|
/**
|
|
* @returns {RemoteNode[]} All remote nodes in undefined order
|
|
*/
|
|
allRemoteNodes() {
|
|
return this.buckets.flat();
|
|
}
|
|
|
|
/**
|
|
* @param {RemoteNode} remoteNode The node to add
|
|
* @returns {void}
|
|
*/
|
|
addRemoteNode(remoteNode) {
|
|
assert(remoteNode instanceof RemoteNode);
|
|
|
|
const bucket = this._bucketFor(remoteNode.id);
|
|
if (bucket === null || bucket.length >= BUCKETSIZE) return;
|
|
if (bucket.some((node) => node.id === remoteNode.id)) return;
|
|
|
|
bucket.push(remoteNode);
|
|
}
|
|
|
|
/**
|
|
* Clears `buckets` from all `RemoteNode`s that aren't alive anymore
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async garbageCollectBuckets() {
|
|
if (this.isGarbageCollectingBuckets) return;
|
|
|
|
try {
|
|
this.isGarbageCollectingBuckets = true;
|
|
|
|
const allRemoteNodes = this.allRemoteNodes();
|
|
const toRemove = [];
|
|
|
|
for (let i = 0; i < allRemoteNodes.length; i += ALPHA_VALUE) {
|
|
const nodes = Array(ALPHA_VALUE)
|
|
.fill(0)
|
|
.map((_, j) => allRemoteNodes[i + j])
|
|
.filter((node) => node);
|
|
|
|
const alives = await Promise.all(nodes.map((node) => node.isAlive()));
|
|
|
|
nodes.forEach((node, i) => {
|
|
if (!alives[i]) toRemove.push(node);
|
|
});
|
|
}
|
|
|
|
toRemove.forEach((node) => console.log(`[gc] removing dead ${node}`));
|
|
|
|
const toRemoveIds = new Set(toRemove.map((node) => node.id));
|
|
this.buckets.forEach((bucket, i) => {
|
|
this.buckets[i] = bucket.filter((node) => !toRemoveIds.has(node.id));
|
|
});
|
|
} finally {
|
|
this.isGarbageCollectingBuckets = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Removes all entries that have expired
|
|
* @returns {void}
|
|
*/
|
|
garbageCollectEntries() {
|
|
const now = new Date();
|
|
|
|
[...this.storage.entries()].forEach(([key, entries]) => {
|
|
entries = entries.filter(([date]) => {
|
|
return now.getTime() - date.getTime() <= TTL_MINUTES * 60 * 1000;
|
|
});
|
|
|
|
if (entries.length) {
|
|
this.storage.set(key, entries);
|
|
} else {
|
|
this.storage.delete(key);
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Bootstrap the node by asking for existing nodes
|
|
* @param {string[]} hosts The hosts/nodes to ask
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async bootstrap(hosts) {
|
|
for (let i = 0; i < hosts.length; i += ALPHA_VALUE) {
|
|
const currHosts = Array(ALPHA_VALUE)
|
|
.fill(0)
|
|
.map((_, i) => hosts.at(i))
|
|
.filter((host) => host);
|
|
|
|
await Promise.all(
|
|
currHosts.map(async (host) => {
|
|
console.log("[bootstrap] checking", host);
|
|
|
|
const node = await RemoteNode.fromHost(host);
|
|
if (!node) return;
|
|
|
|
console.log(`[bootstrap] adding ${node}`);
|
|
this.addRemoteNode(node);
|
|
|
|
try {
|
|
await node.find().then(async (jsonNodes) => {
|
|
if (!jsonNodes) return;
|
|
|
|
const randomJsonNodes = chooseRandom(jsonNodes, 30);
|
|
await Promise.all(
|
|
randomJsonNodes.map(async ([id, host]) => {
|
|
try {
|
|
id = BigInt("0x" + id);
|
|
if (id === this.id) return;
|
|
|
|
const node = await RemoteNode.fromHost(host);
|
|
if (node.id !== id) return;
|
|
|
|
console.log(`[bootstrap] adding ${node}`);
|
|
this.addRemoteNode(node);
|
|
} catch (_) {}
|
|
}),
|
|
);
|
|
});
|
|
} catch (_) {}
|
|
|
|
node.notify(HOST);
|
|
}),
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @returns {express.Express}
|
|
*/
|
|
createServer() {
|
|
const self = this;
|
|
|
|
const server = express();
|
|
|
|
server.get("/ping", (req, res) => {
|
|
const key = self.id.toString(16).padStart(40, "0");
|
|
res.status(242).send(key).end();
|
|
});
|
|
|
|
server.get("/get/:key", (req, res) => {
|
|
let key;
|
|
let limit = null;
|
|
try {
|
|
const keyHex = req.params.key;
|
|
assert(keyHex.length === 40);
|
|
key = BigInt("0x" + keyHex);
|
|
|
|
if (req.query.limit) {
|
|
limit = parseInt(req.query.limit);
|
|
assert(!isNan(limit));
|
|
}
|
|
} catch (_) {
|
|
res.status(400).end();
|
|
return;
|
|
}
|
|
|
|
console.log(`[/get/:key] key = ${key.toString(16)}`);
|
|
|
|
const data = self.get(key, limit).map(([date, data]) => {
|
|
return [date.toUTCString(), Buffer.from(data).toString("base64")];
|
|
});
|
|
|
|
res.status(200).json(data).end();
|
|
});
|
|
|
|
server.post("/put/:key", async (req, res) => {
|
|
let key;
|
|
let data;
|
|
try {
|
|
const keyHex = req.params.key;
|
|
assert(keyHex.length === 40);
|
|
key = BigInt("0x" + keyHex);
|
|
|
|
data = await new Promise((res, rej) => {
|
|
const chunks = [];
|
|
req.on("data", (chunk) => chunks.push(chunk));
|
|
req.on("end", () => {
|
|
try {
|
|
const b64Text = Buffer.concat(chunks).toString("ascii").trim();
|
|
res(new Uint8Array(Buffer.from(b64Text, "base64")));
|
|
} catch (_) {
|
|
rej(0);
|
|
}
|
|
});
|
|
|
|
req.on("error", () => rej(0));
|
|
req.on("close", () => rej(0));
|
|
|
|
setTimeout(() => rej(0), REQ_TIMEOUT_MS);
|
|
});
|
|
|
|
assert(data instanceof Uint8Array);
|
|
assert(data.byteLength <= MAX_VALUE_LENGTH);
|
|
} catch (_) {
|
|
res.status(400).end();
|
|
return;
|
|
}
|
|
|
|
try {
|
|
console.log(`[/put/:key] key = ${key.toString(16)}`);
|
|
|
|
self.put(key, data);
|
|
res.status(200).end();
|
|
} catch (_) {
|
|
res.status(500).end();
|
|
}
|
|
});
|
|
|
|
server.get("/find", (req, res) => {
|
|
const data = self.allRemoteNodes().map((node) => {
|
|
const id = node.id.toString(16).padStart(40, "0");
|
|
return [id, node.host];
|
|
});
|
|
|
|
res.status(200).json(data).end();
|
|
});
|
|
|
|
server.get("/find/:key", (req, res) => {
|
|
let key;
|
|
let limit = null;
|
|
try {
|
|
const keyHex = req.params.key;
|
|
assert(keyHex.length === 40);
|
|
key = BigInt("0x" + keyHex);
|
|
|
|
if (req.query.limit) {
|
|
limit = parseInt(req.query.limit);
|
|
assert(!isNaN(limit));
|
|
}
|
|
} catch (_) {
|
|
res.status(400).end();
|
|
}
|
|
|
|
console.log(`[/find/:key] key = ${key.toString(16)}`);
|
|
|
|
const data = self.findClosestRemoteNodes(key, limit).map((node) => {
|
|
const id = node.id.toString(16).padStart(40, "0");
|
|
return [id, node.host];
|
|
});
|
|
|
|
res.status(200).json(data).end();
|
|
});
|
|
|
|
server.post("/notify", async (req, res) => {
|
|
try {
|
|
const host = await new Promise((res, rej) => {
|
|
const chunks = [];
|
|
req.on("data", (chunk) => chunks.push(chunk));
|
|
req.on("end", () => {
|
|
try {
|
|
res(Buffer.concat(chunks).toString("ascii").trim());
|
|
} catch (_) {
|
|
rej(0);
|
|
}
|
|
});
|
|
|
|
req.on("error", () => rej(0));
|
|
req.on("close", () => rej(0));
|
|
|
|
setTimeout(() => rej(0), REQ_TIMEOUT_MS);
|
|
});
|
|
|
|
console.log("[/notify] checking", host);
|
|
const node = await RemoteNode.fromHost(host);
|
|
if (node) {
|
|
console.log(`[/notify] adding ${node}`);
|
|
this.addRemoteNode(node);
|
|
}
|
|
} catch (_) {}
|
|
|
|
// Always signal success because we don't care
|
|
res.status(200).end();
|
|
});
|
|
|
|
server.use((err, req, res, next) => {
|
|
console.error(err);
|
|
res.status(500).end();
|
|
});
|
|
|
|
return server;
|
|
}
|
|
|
|
/**
|
|
* @param {bigint} id
|
|
* @returns {boolean}
|
|
*/
|
|
_knows(id) {
|
|
const bucket = this._bucketFor(id);
|
|
return bucket.some((node) => node.id === id);
|
|
}
|
|
|
|
/**
|
|
* @private
|
|
* @param {bigint} key
|
|
* @returns {RemoteNode[] | null}
|
|
*/
|
|
_bucketFor(key) {
|
|
assert(typeof key === "bigint");
|
|
|
|
const d = this.id ^ key;
|
|
const bucketIndex = bucketRanges.findIndex(
|
|
([start, end]) => d >= start && d < end,
|
|
);
|
|
if (bucketIndex < 0) return null;
|
|
|
|
return this.buckets.at(bucketIndex) ?? null;
|
|
}
|
|
}
|
|
|
|
class RemoteNode {
|
|
/** @type {bigint} @readonly */
|
|
id;
|
|
/** @type {string} @readonly */
|
|
host;
|
|
|
|
/**
|
|
* @param {bigint} id
|
|
* @param {string} host
|
|
*/
|
|
constructor(id, host) {
|
|
assert(typeof id === "bigint" && typeof host === "string");
|
|
|
|
while (host.length && host.endsWith("/")) {
|
|
host = host.substring(0, host.length - 1);
|
|
}
|
|
|
|
this.id = id;
|
|
this.host = host;
|
|
}
|
|
|
|
/**
|
|
* @returns {Promise<boolean>}
|
|
*/
|
|
async isAlive() {
|
|
try {
|
|
const res = await axios({
|
|
method: "get",
|
|
url: `${this.host}/ping`,
|
|
timeout: ALIVE_TIMEOUT_MS,
|
|
});
|
|
|
|
if (res.status !== 242) return false;
|
|
|
|
if (typeof res.data !== "string") return false;
|
|
if (res.data.trim().length !== 40) return false;
|
|
|
|
const id = BigInt("0x" + res.data.trim());
|
|
return id === this.id;
|
|
} catch (_) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Notifies the remote node about the given `host`
|
|
* @param {string} host
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async notify(host) {
|
|
await axios({
|
|
method: "post",
|
|
url: `${this.host}/notify`,
|
|
data: host,
|
|
headers: {
|
|
"Content-Type": "text/plain",
|
|
},
|
|
timeout: 5000,
|
|
}).catch(() => {});
|
|
}
|
|
|
|
/**
|
|
* @returns {Promise<[string, string][] | null>} The parsed JSON result; `null` on error
|
|
*/
|
|
async find() {
|
|
try {
|
|
const res = await axios({
|
|
method: "get",
|
|
url: `${this.host}/find`,
|
|
timeout: 3000,
|
|
});
|
|
|
|
assert(res.status === 200);
|
|
assert(Array.isArray(res.data));
|
|
|
|
res.data.forEach((e) => {
|
|
assert(Array.isArray(e));
|
|
assert(e.length === 2);
|
|
assert(typeof e[0] === "string");
|
|
assert(typeof e[1] === "string");
|
|
});
|
|
|
|
return res.data;
|
|
} catch (_) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
toString() {
|
|
return `${this.host} (${this.id.toString(16)})`;
|
|
}
|
|
|
|
/**
|
|
* @param {string} host
|
|
* @returns {Promise<RemoteNode | null>}
|
|
*/
|
|
static async fromHost(host) {
|
|
try {
|
|
while (host.length && host.endsWith("/")) {
|
|
host = host.substring(0, host.length - 1);
|
|
}
|
|
|
|
const res = await axios({
|
|
method: "get",
|
|
url: `${host}/ping`,
|
|
timeout: 2000,
|
|
});
|
|
|
|
if (res.status !== 242) return null;
|
|
|
|
if (typeof res.data !== "string") return null;
|
|
if (res.data.trim().length !== 40) return null;
|
|
|
|
const id = BigInt("0x" + res.data.trim());
|
|
|
|
return new RemoteNode(id, host);
|
|
} catch (e) {
|
|
console.error(e);
|
|
return null;
|
|
}
|
|
}
|
|
}
|
|
|
|
const id = BigInt(
|
|
"0x" + crypto.createHash("sha1").update(crypto.randomBytes(20)).digest("hex"),
|
|
);
|
|
|
|
const node = new LocalNode(id);
|
|
|
|
(async () => {
|
|
while (true) {
|
|
node.garbageCollectEntries();
|
|
await new Promise((res) => setTimeout(res, 5000));
|
|
}
|
|
})();
|
|
|
|
(async () => {
|
|
while (true) {
|
|
await node.garbageCollectBuckets();
|
|
await new Promise((res) => setTimeout(res, 5000));
|
|
}
|
|
})();
|
|
|
|
node.createServer().listen(PORT, () => {
|
|
console.log("[server] listening on port", PORT);
|
|
node.bootstrap(BOOTSTRAP);
|
|
});
|