dcp-sort

This app demonstrates how to sort values in a distributed manner using the Distributive Compute Protocol (DCP). If you have a good understanding of how DCP works and the terminology already please feel free to skip the refresher and jump straight to the set up.

A refresher

Click to see the refresher

DCP is a distributed computing framework made from web-based technology.

Computers and devices in classrooms, computer labs, households, and enterprises can be turned into computing clusters with a single click with DCP. The computational workload, a job, is sub-divided into smaller parts. These smaller parts are called slices. Slices are executed in parallel using DCP. Each slice of the job will correspond to a value in an array that we wish to sort. Slices will be sent to different Workers (for example, laptops, desktops, cell phones, etc.) which contain sandboxes. A sandbox is a clean environment on your Worker that runs your work function.

DCP is a powerful parallel computing framework that allows users to express a computational job as effortlessly as:

job = compute.for(inputSet, workFunction);

resultSet = await job.exec();

A workFunction is mapped onto each element in an inputSet. Each mapping represents a slice. Slices are distributed across DCP networks for computation. Results are returned to the user from DCP networks coherently.

Remember, the work function must contain progress();. Progress is a call made to tell the scheduler that the job is still alive and running. Progress is considered the heart beat of the job, without it the job would die and no results would be returned.

  1. Create compute nodes: go to https://dcp.work on as many devices as you want and click Start to join the public Compute Group, or to dcp.work/joinKey if you have a private Compute Group joinKey and joinSecret.

  2. Configure dev environment: load dcp-client and any required packages.

  3. Specify the inputSet: an arbitrary, but enumerable input dataset (parameters, mp3 files, images, blender project file, etc)

  4. Specify the workFunction: an arbitrary work function (physics simulation, inference model, rendering process, etc)

  5. Express the job: Map the workFunction onto the inputSet with

    job = compute.for(inputSet, workFunction);
    
  6. (optional) Specify a Compute Group with

    job.computeGroups = [{ joinKey: 'name', joinSecret: 'passphrase' }];
    
  7. Await the resultSet: Execute the job in parallel on DCP via

    resultSet = await job.exec();
    

Setup

Create a directory for your project and download the DCP packages by running the below command in your terminal window.

npm i dcp-client

Next, write the following helper functions. Note that they’re app-specific, not critical to DCP.

  • generateInput generates random numbers to sort.

  • splitInput() splits an array of numbers into different chunks, so that workers sort each chunk in parallel.

function generateInput(amount) {
  const array = [];
  for (let i = 0; i < amount; i += 1) {
    array.push(Math.floor(Math.random() * 1000));
  }
  return array;
}

function splitInput(arr, numChunks) {
  const chunks = [];
  for (let i = 0; i < numChunks; i += 1) chunks[i] = [];
  for (let i = 0; i < arr.length; i += 1) chunks[i % numChunks].push(arr[i]);
  return chunks;
}

Next, create the asynchronous main function. The program requires the dcp-client module to initialize access to the Compute API.

async function main() {
  const compute = require('dcp/compute');
  /* INPUT DATA */

  /* WORK FUNCTION */

  /* COMPUTE FOR */

  /* PROCESS RESULTS */
}

require('dcp-client').init('https://scheduler.distributed.computer').then(main);

Input data

Within main(), use the helper functions to generate some random input and split them into chunks.

/* INPUT DATA */
const sourceSet = generateInput(TOTAL);
console.log('Source Set: ', sourceSet);
const inputSet = splitInput(sourceSet, CHUNKS);
console.log('Chunks Set: ', inputSet);

Work function

Still within main(), DCP sends workers one element of the input set and performs this work function on it. In this app, the work function sorts a chunk and returns it.

/* WORK FUNCTION
 * Returns one sorted chunk per slice */
async function workFn(arr) {
  progress();
  return arr.sort(function (a, b) {
    return a - b;
  });
}

compute.for

Now launch the DCP job by calling compute.for(). It distributes each element of the input set to workers in parallel, who execute the work function on one of those elements.

/* COMPUTE FOR */
const job = compute.for(inputSet, workFn);
job.public.name = 'dcp-sort - Node';

Compute Group

If using a Compute Group, enter the below line with the join information for the specific group. Skip this section if you won’t be using a Compute Group.

// SKIP IF: you don't need a Compute Group
job.computeGroups = [{ joinKey: 'Your Key', joinSecret: 'Your Secret' }];

Process results

First, await the results to return from job.exec(). Each element of the results is a sorted chunk of numbers.

/* PROCESS RESUTLS */
let resultSet = job.exec();
resultSet = Array.from(resultSet);
console.log('Sorted Chunks: ', resultSet);

The last step is to merge these chunks together to make one sorted list. Since the chunks are already sorted, a greedy approach works well. The following function searches the first index of each chunk (which is always the smallest), figures out which chunk has the lowest first value, and pops that value into the new main sorted array. Once none of the chunks have any remaining values, the final array is complete and sorted.

The function chunkSort() handles this, and it’s not DCP-specific.

function chunkSort(arr) {
  const sorted = [];
  while (arr.length) {
    let currentChunk = arr[0];
    let bestChunkIndex = 0;
    let bestValue = currentChunk[0];
    for (let i = 1; i < arr.length; i += 1) {
      currentChunk = arr[i];
      if (currentChunk[0] < bestValue) {
        [bestValue] = currentChunk;
        bestChunkIndex = i;
      }
    }
    sorted.push(arr[bestChunkIndex].shift());
    if (arr[bestChunkIndex].length === 0) arr.splice(bestChunkIndex, 1);
  }
  return sorted;
}

Lastly, run the sorted chunks through chunkSort, and report the final results.

const sortedSet = chunkSort(resultSet);
console.log('Sorted Array: ', sortedSet);

Run it

Run node your-file.js, and watch as your work distributes.

Full code

Click to see full code.
/* GENERATE INPUT
 * Returns an array of 'amount' random numbers */
function generateInput(amount) {
  const array = [];
  for (let i = 0; i < amount; i += 1) {
    array.push(Math.floor(Math.random() * 1000));
  }
  return array;
}

/* SPLIT INPUT
 * Takes input array arr and splits it into numChunks different chunks */
function splitInput(arr, numChunks) {
  const chunks = [];
  for (let i = 0; i < numChunks; i += 1) chunks[i] = [];
  for (let i = 0; i < arr.length; i += 1) chunks[i % numChunks].push(arr[i]);
  return chunks;
}

async function main() {
  const compute = require('dcp/compute');
  const TOTAL = 13; // How many numbers to generate total
  const CHUNKS = 3; // Total number of chunks

  /* INPUT DATA */
  const sourceSet = generateInput(TOTAL);
  console.log('Source Set: ', sourceSet);
  const inputSet = splitInput(sourceSet, CHUNKS);
  console.log('Chunks Set: ', inputSet);

  /* WORK FUNCTION
   * Returns one sorted chunk per slice */
  async function workFn(arr) {
    progress();
    return arr.sort(function (a, b) {
      return a - b;
    });
  }

  /* COMPUTE FOR */
  const job = compute.for(inputSet, workFn);
  job.public.name = 'dcp-sort';
  // job.computeGroups = [{ joinKey: 'Your Key', joinSecret: 'Your Secret' }];

  // Not mandatory console logs for status updates
  job.on('accepted', () => {
    console.log(` - Job accepted with id: ${job.id}`);
  });
  job.on('result', (ev) => {
    console.log(` - Received result ${ev}`);
  });

  /* PROCESS RESUTLS */
  let resultSet = await job.exec();
  resultSet = Array.from(resultSet);
  console.log('Sorted Chunks: ', resultSet);

  function chunkSort(arr) {
    const sorted = [];
    while (arr.length) {
      let currentChunk = arr[0];
      let bestChunkIndex = 0;
      let bestValue = currentChunk[0];

      for (let i = 1; i < arr.length; i += 1) {
        currentChunk = arr[i];
        if (currentChunk[0] < bestValue) {
          [bestValue] = currentChunk;
          bestChunkIndex = i;
        }
      }

      sorted.push(arr[bestChunkIndex].shift());
      if (arr[bestChunkIndex].length === 0) arr.splice(bestChunkIndex, 1);
    }
    return sorted;
  }

  const sortedSet = chunkSort(resultSet);
  console.log('Sorted Array: ', sortedSet);
  console.log(' - Job Complete');
}

require('dcp-client')
  .init('https://scheduler.distributed.computer')
  .then(main)
  .catch(console.error)
  .finally(process.exit);