dSort

A refresher

This application demonstrates how to sort values in a distributed manner using the Distributed Compute Protocol (DCP). DCP is a distributed computing framework made from web-based technology.

Computers and devices in classrooms, computer labs, households, and enterprises are turned into computing clusters with a single click with DCP. The entire workload, a job, is sub-divided into smaller parts, called slices. Slices are executed in parallel using DCP. Each slice of the job will correspond to a value in an array that we wish to sort. Slices will be sent to different Workers (for example, laptops, desktops, cell phones, etc.) which contain sandboxes. A sandbox is a clean environment on your Worker that runs your work function.

DCP is a powerful parallel computing framework that allows users to express a computational job as effortlessly as:

job = compute.for(inputSet, workFunction)

resultSet = await job.exec()

A workFunction is mapped onto each element in an inputSet. Each mapping represents a slice. Slices are distributed across DCP networks for computation. Results are returned to the user from DCP networks coherently.

Remember, the work function must contain progress();. Progress is a call made to tell the scheduler that the job is still alive and running. Progress is considered the heart beat of the job, without it the job would die and no results would be returned.

  1. Create compute nodes: go to https://dcp.work on as many devices as you want and click Start to join the public compute group, or to dcp.work/joinKey if you have a private compute group joinKey and joinSecret.

  2. Configure dev environment: load dcp-client and any required packages.

  3. Specify the inputSet: an arbitrary, but enumerable input dataset (parameters, mp3 files, images, blender project file, etc)

  4. Specify the workFunction: an arbitrary work function (physics simulation, inference model, rendering process, etc)

  5. Express the job: Map the workFunction onto the inputSet with job = compute.for(inputSet, workFunction)

  6. (optional) Specify a compute group with job.computeGroups=[{joinKey: name, joinSecret: password}]

  7. Await the resultSet: Execute the job in parallel on DCP via resultSet = await job.exec()

An example of a distributed brute force sorting algorithm:

First, sign up for a DCP account at https://portal.distributed.computer. After signing up, download the id.keystore and default.keystore associated with your account. Refer to this tutorial for more information on how to download your keystores.

Setup

Create a directory for your project and download the DCP packages by running the below command in your terminal window.

npm i dcp-client

Next, make a JavaScript file and create the asynchronous main function. The dSort program requires the dcp-client module to initialize access to the Compute API.

async function main() {
  const compute = require('dcp/compute');
  // Rest of our code will go in the following sections:
  /* INPUT DATA */

  /* WORK FUNCTION */

  /* COMPUTE FOR */

  /* PROCESS RESULTS */
}

require('dcp-client').init('https://scheduler.distributed.computer').then(main);

Input data

Within main(), the input data is an array with length NUM containing integers generated using Math.random. Feel free to replace this section with your own input data.

/* INPUT DATA */
// Generate NUM random numbers between -49 and +50
const inputSet = [];
const outputSet = [];
const NUM = 50;

for (let i = 0; i < NUM; i += 1) {
  inputSet[i] = Math.floor(Math.random() * 100) - 49;
}

Work function

Still within main(), this is the function that each Worker runs in parallel. DCP sends each value from the inputSet to a sandbox. The work function then compares the inputSet value to the rest of the array. Finally, the work function returns the index of the inputSet value sorted within the outputSet.

/* WORK FUNCTION */
// Run by each sandbox for one value from the set
function workFn(value, set) {
  let index = 0;
  for (let j = 0; j < set.length; j += 1) {
    progress();
    if (set[value] > set[j]) index += 1;
  }
  return index;
}

compute.for

Now to set up the job. It’s passed an iterable that goes from 0 to inputSet.length - 1 so that DCP can send each value in the input to one sandbox. Passing [arguments] sends the inputSet to each sandbox. Then assign a public name for the job.

/* COMPUTE FOR */
// Set up compute.for with an iterable, a work function, and any arguments.
const job = compute.for(0, inputSet.length - 1, workFn, [inputSet]);
job.public.name = 'dSort';

Compute group

If using a compute group, enter the below line with the join information for the specific group. Skip this section if you won’t be using a compute group.

// SKIP IF: you don't need a compute group
job.computeGroups = [{ joinKey: 'Your Key', joinSecret: 'Your Key' }];

Process results

Next, dSort waits for the results to come back. The resultSet is an array with indices corresponding to the indices of the input set. The values of the array correspond to the indices of the output.

/* PROCESS RESULTS */
let resultSet = await job.exec();
resultSet = Array.from(resultSet);

Use the results to fill the sorted array, outputSet. In general, outputSet[resultSet[i]] = inputSet[i]. Two(+) inputs of the same value have the same output index. To account for having two(+) inputs of the same value, the dSort algorithm pushes forward an index when it encounters an index that already has the duplicate value.

// Creates sorted list from results (non-distributed)
for (let i = 0; i < resultSet.length; i += 1) {
  // First instance of value
  if (outputSet[resultSet[i]] === undefined) {
    outputSet[resultSet[i]] = inputSet[i];
  } else {
    // Further duplicates
    let next = 1;
    while (outputSet[resultSet[i] + next] !== undefined) {
      next += 1;
    }
    outputSet[resultSet[i] + next] = inputSet[i];
  }
}

Lastly, the program logs the results to the console.

console.log(inputSet);
console.log(outputSet);
console.log('Job Complete');

Run it

Run node your-file.js, and watch as your work distributes.

Full code

Click to see full code.
async function main() {
  const compute = require('dcp/compute');
  const inputSet = [];
  const outputSet = [];
  const NUM = 100;

  // Fill input set with NUM random numbers
  for (let i = 0; i < NUM; i += 1) {
    inputSet[i] = Math.floor(Math.random() * 100) - 49;
  }

  // Distributed Work Function
  // Each slice loops through the dataset and counts values below current slice value
  async function workFn(value, set) {
    let index = 0;
    for (let j = 0; j < set.length; j += 1) {
      progress();
      if (set[value] > set[j]) index += 1;
    }
    return index;
  }

  // Set up compute.for with an iterable, a work function, and any arguments.
  const job = compute.for(0, inputSet.length - 1, workFn, [inputSet]);
  job.public.name = 'dSort';

  // SKIP IF: you don't need a compute group
  // job.computeGroups = [{ joinKey: 'Your Key', joinSecret: 'Your Key' }];

  // Not mandatory console logs for status updates
  job.on('accepted', () => {
    console.log(` - Job accepted with id: ${job.id}`);
  });
  job.on('result', (ev) => {
    console.log(` - Received result ${ev}`);
  });

  let resultSet = await job.exec();
  resultSet = Array.from(resultSet);

  // Creates sorted list from results (non-distributed)
  for (let i = 0; i < resultSet.length; i += 1) {
    // First instance of value
    if (outputSet[resultSet[i]] === undefined) {
      outputSet[resultSet[i]] = inputSet[i];
    } else {
      // Further duplicates
      let next = 1;
      while (outputSet[resultSet[i] + next] !== undefined) {
        next += 1;
      }
      outputSet[resultSet[i] + next] = inputSet[i];
    }
  }

  console.log(inputSet);
  console.log(outputSet);
  console.log(' - Job Complete');
}

require('dcp-client').init('https://scheduler.distributed.computer').then(main);