Deploying jobs with remote input data

If you want to deploy a job with a large amount of input data, you may want to avoid uploading it to the scheduler, from which the workers executing slices of your job fetch from. Instead, you can skip the intermediary party and have the workers directly fetch the job from a remote location specified as a RemoteDataSet, an array of URL objects.

To deploy a job using input data specified with URL objects, there are two ways to stringify input-data before sending the data to workers from the web server:

  • JSON: A common method for serializing data in network requests.

  • KVIN: A comprehensive library that serializes JavaScript types for transmission over a network in a way that co-exists peacefully with JSON, but it supports more data types (for example, Typed Arrays, values with circular references, etc.). It’s a Content-Type that DCP workers understand for the express purpose of being able to fetch said complex data types.

    To let workers know about it, you need to define the Content-Type (for example, in an Express app: res.header("Content-Type", "application/x-kvin")), when using kvin.serialize(inputData).

Note that specifying the Access-Control-Allow-Headers and Access-Control-Allow-Origin headers are also necessary so that workers on web browsers can fetch the data.

Input data

The input data in this example is an array of URL objects. For demonstration purposes, the example represents the input data as a basic data structure.

const express = require('express');

// INPUT DATA WITH JSON(default)
const app = express();
const port = 12345;
app.get('/', (req, res) => {
  res.header('Access-Control-Allow-Headers', 'content-type');
  res.header('Access-Control-Allow-Origin', '*');
  const inputData = { x: 1, y: 2 };
  res.send(inputData);
});

app.listen(port, () => {
  console.log(`port ${port} is ready!`);
});

const inputSet = [new URL('http://localhost:12345/')];
const express = require('express');
const kvin = require('kvin');

// INPUT DATA WITH KVIN
const app = express();
const port = 12345;
app.get('/', (req, res) => {
  res.header('Access-Control-Allow-Headers', 'content-type');
  res.header('Access-Control-Allow-Origin', '*');
  res.header('Content-Type', 'application/x-kvin');
  const inputData = { x: 1, y: 2 };
  res.send(kvin.serialize(inputData));
});

app.listen(port, () => {
  console.log(`port ${port} is ready!`);
});

const inputSet = [new URL('http://localhost:12345/')];

compute.for

const job = compute.for(inputSet, workFn);

Worker

To allow workers fetch specific URLs you need to add the ‘s origin’ to the worker’s configured allowOrigins list.

  • Node.js worker: At the end of the command line to start the worker add -a http://localhost:12345

  • Web Browser worker: Type in the console dcpConfig.worker.allowOrigins.any.push('http://localhost:12345')

Full code

Click to see full code
const express = require('express');

async function main() {
  const compute = require('dcp/compute');
  const app = express();

  /* INPUT SET */
  const port = 12345;
  app.get('/', (req, res) => {
    res.header('Access-Control-Allow-Headers', 'content-type');
    res.header('Access-Control-Allow-Origin', '*');
    const inputData = { x: 1, y: 2 };
    res.send(inputData);
  });

  app.listen(port1, () => {
    console.log(`port ${port1} is ready!`);
  });

  const port2 = 12346;
  app.get('/', (req, res) => {
    res.header('Access-Control-Allow-Headers', 'content-type');
    res.header('Access-Control-Allow-Origin', '*');
    const inputData = { x: 101, y: 201 };
    res.send(inputData);
  });

  app.listen(port, () => {
    console.log(`port ${port2} is ready!`);
  });

  const inputSet = [
    new URL('http://localhost:12345/'),
    new URL('http://localhost:12346/'),
  ];

  /* WORK FUNCTION */
  async function workFn(data) {
    progress();
    let sum = 0;
    for (let i = 0; i < 10000000; i += 1) {
      progress(i / 10000000);
      sum += Math.random();
    }
    return data;
  }

  // Create new job.
  const job = compute.for(inputSet, workFn);
  job.public.name = 'dataURL-example';

  // Adding Job Listeners.
  job.on('accepted', () => {
    console.log(` - Job accepted by scheduler, waiting for results`);
    console.log(` - Job has address ${job.id}`);
    startTime = Date.now();
  });

  job.on('readystatechange', (arg) => {
    console.log(`new ready state: ${arg}`);
  });

  job.on('result', (ev) => {
    console.log(
      ` - Received result for slice ${ev.sliceNumber} at ${
        Math.round((Date.now() - startTime) / 100) / 10
      }s`,
    );
    console.log(
      ` * Wow! ${JSON.stringify(ev.result)} is such a pretty object!`,
    );
  });

  // SKIP IF: you do not need a compute group
  // job.computeGroups = [{ joinKey: 'KEY', joinSecret: 'SECRET' }];

  /* PROCESS RESULTS */
  let resultSet = await job.exec();
  resultSet = Array.from(resultSet);
  console.log(resultSet.toString().replace(',', ''));
}

require('dcp-client')
  .init('https://scheduler.distributed.computer')
  .then(main)
  .catch(console.error);