dcp-sort
This app demonstrates how to sort values in a distributed manner using the Distributive Compute Protocol (DCP). If you have a good understanding of how DCP works and the terminology already please feel free to skip the refresher and jump straight to the set up.
A refresher
Click to see the refresher
DCP is a distributed computing framework made from web-based technology.
Computers and devices in classrooms, computer labs, households, and enterprises can be turned into computing clusters with a single click with DCP. The computational workload, a job, is sub-divided into smaller parts. These smaller parts are called slices. Slices are executed in parallel using DCP. Each slice of the job will correspond to a value in an array that we wish to sort. Slices will be sent to different Workers (for example, laptops, desktops, cell phones, etc.) which contain sandboxes. A sandbox is a clean environment on your Worker that runs your work function.
DCP is a powerful parallel computing framework that allows users to express a computational job as effortlessly as:
job = compute.for(inputSet, workFunction);
resultSet = await job.exec();
A workFunction is mapped onto each element in an inputSet
. Each mapping represents a slice. Slices are distributed across DCP networks for computation. Results are returned to the user from DCP networks coherently.
Remember, the work function must contain progress();
. Progress is a call made to tell the scheduler that the job is still alive and running. Progress is considered the heart beat of the job, without it the job would die and no results would be returned.
Create compute nodes: go to https://dcp.work on as many devices as you want and click Start to join the public Compute Group, or to dcp.work/
joinKey
if you have a private Compute GroupjoinKey
andjoinSecret
.Configure dev environment: load
dcp-client
and any required packages.Specify the
inputSet
: an arbitrary, but enumerable input dataset (parameters, mp3 files, images, blender project file, etc)Specify the
workFunction
: an arbitrary work function (physics simulation, inference model, rendering process, etc)Express the job: Map the
workFunction
onto theinputSet
withjob = compute.for(inputSet, workFunction);
(optional) Specify a Compute Group with
job.computeGroups = [{ joinKey: 'name', joinSecret: 'passphrase' }];
Await the
resultSet
: Execute the job in parallel on DCP viaresultSet = await job.exec();
Setup
Create a directory for your project and download the DCP packages by running the below command in your terminal window.
npm i dcp-client
Next, write the following helper functions. Note that they’re app-specific, not critical to DCP.
generateInput
generates random numbers to sort.splitInput()
splits an array of numbers into different chunks, so that workers sort each chunk in parallel.
function generateInput(amount) {
const array = [];
for (let i = 0; i < amount; i += 1) {
array.push(Math.floor(Math.random() * 1000));
}
return array;
}
function splitInput(arr, numChunks) {
const chunks = [];
for (let i = 0; i < numChunks; i += 1) chunks[i] = [];
for (let i = 0; i < arr.length; i += 1) chunks[i % numChunks].push(arr[i]);
return chunks;
}
Next, create the asynchronous main
function. The program requires the dcp-client
module to initialize access to the Compute API
.
async function main() {
const compute = require('dcp/compute');
/* INPUT DATA */
/* WORK FUNCTION */
/* COMPUTE FOR */
/* PROCESS RESULTS */
}
require('dcp-client').init('https://scheduler.distributed.computer').then(main);
Input data
Within main()
, use the helper functions to generate some random input and split them into chunks.
/* INPUT DATA */
const sourceSet = generateInput(TOTAL);
console.log('Source Set: ', sourceSet);
const inputSet = splitInput(sourceSet, CHUNKS);
console.log('Chunks Set: ', inputSet);
Work function
Still within main()
, DCP sends workers one element of the input set and performs this work function on it. In this app, the work function sorts a chunk and returns it.
/* WORK FUNCTION
* Returns one sorted chunk per slice */
async function workFn(arr) {
progress();
return arr.sort(function (a, b) {
return a - b;
});
}
compute.for
Now launch the DCP job by calling compute.for()
. It distributes each element of the input set to workers in parallel, who execute the work function on one of those elements.
/* COMPUTE FOR */
const job = compute.for(inputSet, workFn);
job.public.name = 'dcp-sort - Node';
Compute Group
If using a Compute Group, enter the below line with the join information for the specific group. Skip this section if you won’t be using a Compute Group.
// SKIP IF: you don't need a Compute Group
job.computeGroups = [{ joinKey: 'Your Key', joinSecret: 'Your Secret' }];
Process results
First, await the results to return from job.exec()
. Each element of the results is a sorted chunk of numbers.
/* PROCESS RESUTLS */
let resultSet = job.exec();
resultSet = Array.from(resultSet);
console.log('Sorted Chunks: ', resultSet);
The last step is to merge these chunks together to make one sorted list. Since the chunks are already sorted, a greedy approach works well. The following function searches the first index of each chunk (which is always the smallest), figures out which chunk has the lowest first value, and pops that value into the new main sorted array. Once none of the chunks have any remaining values, the final array is complete and sorted.
The function chunkSort()
handles this, and it’s not DCP-specific.
function chunkSort(arr) {
const sorted = [];
while (arr.length) {
let currentChunk = arr[0];
let bestChunkIndex = 0;
let bestValue = currentChunk[0];
for (let i = 1; i < arr.length; i += 1) {
currentChunk = arr[i];
if (currentChunk[0] < bestValue) {
[bestValue] = currentChunk;
bestChunkIndex = i;
}
}
sorted.push(arr[bestChunkIndex].shift());
if (arr[bestChunkIndex].length === 0) arr.splice(bestChunkIndex, 1);
}
return sorted;
}
Lastly, run the sorted chunks through chunkSort
, and report the final results.
const sortedSet = chunkSort(resultSet);
console.log('Sorted Array: ', sortedSet);
Run it
Run node your-file.js
, and watch as your work distributes.
Full code
Click to see full code.
/* GENERATE INPUT
* Returns an array of 'amount' random numbers */
function generateInput(amount) {
const array = [];
for (let i = 0; i < amount; i += 1) {
array.push(Math.floor(Math.random() * 1000));
}
return array;
}
/* SPLIT INPUT
* Takes input array arr and splits it into numChunks different chunks */
function splitInput(arr, numChunks) {
const chunks = [];
for (let i = 0; i < numChunks; i += 1) chunks[i] = [];
for (let i = 0; i < arr.length; i += 1) chunks[i % numChunks].push(arr[i]);
return chunks;
}
async function main() {
const compute = require('dcp/compute');
const TOTAL = 13; // How many numbers to generate total
const CHUNKS = 3; // Total number of chunks
/* INPUT DATA */
const sourceSet = generateInput(TOTAL);
console.log('Source Set: ', sourceSet);
const inputSet = splitInput(sourceSet, CHUNKS);
console.log('Chunks Set: ', inputSet);
/* WORK FUNCTION
* Returns one sorted chunk per slice */
async function workFn(arr) {
progress();
return arr.sort(function (a, b) {
return a - b;
});
}
/* COMPUTE FOR */
const job = compute.for(inputSet, workFn);
job.public.name = 'dcp-sort';
// job.computeGroups = [{ joinKey: 'Your Key', joinSecret: 'Your Secret' }];
// Not mandatory console logs for status updates
job.on('accepted', () => {
console.log(` - Job accepted with id: ${job.id}`);
});
job.on('result', (ev) => {
console.log(` - Received result ${ev}`);
});
/* PROCESS RESUTLS */
let resultSet = await job.exec();
resultSet = Array.from(resultSet);
console.log('Sorted Chunks: ', resultSet);
function chunkSort(arr) {
const sorted = [];
while (arr.length) {
let currentChunk = arr[0];
let bestChunkIndex = 0;
let bestValue = currentChunk[0];
for (let i = 1; i < arr.length; i += 1) {
currentChunk = arr[i];
if (currentChunk[0] < bestValue) {
[bestValue] = currentChunk;
bestChunkIndex = i;
}
}
sorted.push(arr[bestChunkIndex].shift());
if (arr[bestChunkIndex].length === 0) arr.splice(bestChunkIndex, 1);
}
return sorted;
}
const sortedSet = chunkSort(resultSet);
console.log('Sorted Array: ', sortedSet);
console.log(' - Job Complete');
}
require('dcp-client')
.init('https://scheduler.distributed.computer')
.then(main)
.catch(console.error)
.finally(process.exit);