Basic DCP Job Tutorial (Python)ο
This tutorial demonstrates how to create and run a simple DCP Job using Python. You will learn how to:
Define an input set of data to be processed in parallel.
Write a worker function that operates on each element.
Configure a Job, including public metadata and compute groups.
Attach event listeners to monitor job progress, results, and errors.
Execute the Job and post-process the results.
The example converts a string of lowercase characters into uppercase letters using the Distributive Compute Platform (DCP), giving you a hands-on introduction to distributed computation.
π‘ The complete code is available in the Full Code section at the bottom of this page.
A more advanced tutorial will follow, covering topics such as:
Specifying slice payment offers and alternate payment accounts.
Working with remote datasets.
Deploying jobs across multiple compute groups in federated networks.
This tutorial focuses on the core building blocks so you can get a Job running quickly and safely.
Requirementsο
id.keystoreanddefault.kestorein~/.dcp(see API keys)pip install dcp
DCP Jobο
Create a JavaScript file called to-upper-case.py and define an asynchronous main function. The program requires the dcp-client module to initialize access to the Compute API.
Initialize DCP and define main()ο
import dcp
dcp.init()
By default, .init() is equivalent to:
.init('https://scheduler.distributed.computer')
This connects to the public DCP scheduler. Additional schedulers may be available in the future, allowing job deployers to select a specific target scheduler.
Learn moreο
Input Setο
The input set is a list of enumerable values (e.g., integers, strings, objects, images). Each element in the array is treated as an independent input datum and processed in parallel by DCP workers.
In this example, the input set is an array of characters derived from the string "yelling!".
input_set = list('yelling!')
Work Functionο
The work function defines the computation applied to each element of the input set. DCP serializes this function and distributes it to Workers, where it is executed independently for each input datum in parallel.
Each Worker receives a single input element and returns a corresponding result. Because execution is isolated per input, the work function should be deterministic and self-contained.
In this example, each Worker receives one character and converts it to uppercase:
def work_function(letter):
dcp.progress();
return letter.upper();
letteris one element from the input set.progress()reports task progress back to the scheduler.The return value becomes one element in the final results array.
When the job completes, the results are returned in the same order as the original input set.
Learn more:ο
Jobο
A Job represents the workload to be executed across the DCP network. Each element of the input set is packaged as a Job slice and processed independently by Workers in parallel. Jobs encapsulate the input, the work function, and optional configuration such as compute groups, public metadata, and event handlers.
job = dcp.compute_for(input_set, work_function);
At this point, the job object can be further configured before execution (see Job Configuration below).
Learn moreο
Job Configurationο
After creating a job with compute_for(...), you can configure its behavior and metadata by setting properties on the job object before calling job.exec().
Public informationο
You may attach optional, publicly visible metadata to your job. This information can be displayed in dashboards or monitoring tools and helps identify the purpose of the job.
job.public.name = 'toUpperCase'
job.public.description = 'Minimal demonstration of a distributed job'
job.public.link = 'https://distributive.network'
nameβ A short identifier for the jobdescriptionβ A brief explanation of what the job doeslinkβ Optional reference URL for additional context
This metadata does not affect execution behavior; it is informational only.
Compute Groupsο
DCP jobs can be deployed to one or more compute groups. A compute group may be public (such as the Global DCP group) or private. To deploy into a private group, you must have the appropriate join credentials.
Security note: Never hard-code joinSecret values in production code. Prompt for them at runtime or load them securely from environment variables. Any hard-coded secrets shown in examples are for demonstration purposes only.
If no compute group is specified, the job is deployed to the public Global DCP group by default.
Deploy to a Private Compute Groupο
job.computeGroups = [
{ 'joinKey':'<key>', 'joinSecret':'<secret>' }
];
Deploy to Multiple Private Compute Groupsο
job.computeGroups = [
{ 'joinKey':'<key1>', 'joinSecret':'<secret1>' },
{ 'joinKey':'<key2>', 'joinSecret':'<secret2>' },
{ 'joinKey':'<key3>', 'joinSecret':'<secret3>' },
];
Deploy to Both Public and Private Groupsο
To deploy to a private group and the public Global DCP group:
job.computeGroups = [
{ 'joinKey':'<key>', joinSecret:'<secret>' },
{ 'joinKey':'public' }
];
Each listed group becomes eligible to execute slices of the job.
Event Listenersο
DCP Jobs support a variety of optional event listeners that let you monitor execution, track results, and handle errors in real time. You can attach listeners to the job object before calling job.exec(). configure some optional events.
Ready State Changeο
Fires whenever the jobβs ready state changes. States typically include: exec, init, preauth, deploying, listeners, compute-groups, uploading, and deployed.
job.on('readystatechange', lambda s: print(f"Ready State: {s}"))
Acceptedο
Fires when the job is accepted by the DCP Scheduler. At this point, the job is assigned a unique ID that can be displayed or logged.
job.on('accepted', lambda _: print(f" Job ID: {job.id}\n Awaiting results..."))
Resultο
Fires whenever a Worker returns a result. These events may arrive out of order relative to the input set. The final output from job.exec() is collated in the original input order.
job.on('result', lambda r: print(json.dumps(r, indent=4).replace('\\n', '\n')))
Errorο
Fires whenever a Worker encounters an error while processing a job slice. You can use this to debug or handle failures gracefully.
job.on('error', lambda e: print(json.dumps(e, indent=4).replace('\\n', '\n')))
Consoleο
Allows capturing print() statements from Workers. Since Worker code runs remotely, its output isnβt visible locally unless you listen for this event. Useful for debugging distributed computations.
job.on('console', lambda c: print(json.dumps(c, indent=4).replace('\\n', '\n')))
No Fundsο
Fires when the account paying for the job does not have sufficient funds to continue processing. The job is paused until the account is topped up.
job.on('nofunds', lambda n: print(json.dumps(n, indent=4).replace('\\n', '\n')))
Stopο
Fires when the job is stopped manually or via the scheduler. In most cases, this is optional, since job.exec() resolves when all results are returned.
job.on('stop', lambda p: print(json.dumps(p, indent=4).replace('\\n', '\n')))
Job Executionο
Execute the Job by calling job.exec(). This submits the Job to the scheduler but does not block execution.
job.exec()
results = job.wait()
job.exec()is non-blocking β it deploys the Job to the scheduler and returns immediately.job.wait()is blocking β it waits until all Job slices have completed and then returns the final results object.
The value returned from job.wait() contains the collated results in input order.
By default:
The Job is associated with the identity stored in
id.keystorelocated in~/.dcp.If no additional arguments are provided,
job.exec()will usedefault.keystoreto pay for the Job with Compute Credits at the currentmarketRate.
Advanced topics such as using alternate payment accounts, customizing slice payment offers, and remote datasets will be covered in the next tutorial.
Learn moreο
Result Post-Processingο
Once execution completes, you can process the returned results as needed. In this example, we concatenate the characters, and print the final string.
print(''.join(results))
This produces the fully reconstructed uppercase string.
Run itο
Ensure you have installed dependencies and have your API keys in ~/.dcp.
1. From your terminal, run the script:
python to-upper-case.py
2. Observe the job submission and execution:
Job events (accepted, results, errors) will be logged in real time.
Once complete, the final uppercase string will be printed to the terminal.
3. Example terminal output:
dandesjardins@Dans-MacBook-Air-2 python % python3 to-upper-case.py
Ready State: exec
Ready State: init
Ready State: preauth
Ready State: deploying
Ready State: listeners
Ready State: compute-groups
Ready State: uploading
Ready State: deployed
Job ID: jmryXjBSVYUEVF6kfm34tW
Awaiting results...
{'job': 'jmryXjBSVYUEVF6kfm34tW', 'sliceNumber': 1.0, 'result': 'Y'}
{'job': 'jmryXjBSVYUEVF6kfm34tW', 'sliceNumber': 3.0, 'result': 'L'}
{'job': 'jmryXjBSVYUEVF6kfm34tW', 'sliceNumber': 2.0, 'result': 'E'}
{'job': 'jmryXjBSVYUEVF6kfm34tW', 'sliceNumber': 5.0, 'result': 'I'}
{'job': 'jmryXjBSVYUEVF6kfm34tW', 'sliceNumber': 4.0, 'result': 'L'}
{'job': 'jmryXjBSVYUEVF6kfm34tW', 'sliceNumber': 6.0, 'result': 'N'}
{'job': 'jmryXjBSVYUEVF6kfm34tW', 'sliceNumber': 7.0, 'result': 'G'}
{'job': 'jmryXjBSVYUEVF6kfm34tW', 'sliceNumber': 8.0, 'result': '!'}
YELLING!
This demonstrates how DCP distributes computation across workers, even for simple tasks like converting characters to uppercase.
Full Codeο
The following example combines all the concepts covered in this tutorial: input set definition, work function, job configuration, compute groups, event handling, execution, and result post-processing. Running this code will convert the string βyelling!β to uppercase using DCP Workers.
import json
import dcp
dcp.init()
# INPUT SET
input_set = list('yelling!')
# WORK FUNCTION
def work_function(letter):
dcp.progress()
return letter.upper()
# COMPUTE FOR
job = dcp.compute_for(input_set, work_function)
# COMPUTE GROUPS
job.computeGroups = [
{ 'joinKey':'demo', 'joinSecret':'dcp' },
{ 'joinKey':'public' }
];
# PUBLIC INFO
job.public.name = 'to-upper-case'
job.public.description = 'Minimal demonstration of a distributed job'
job.public.link = 'https://distributive.network'
# EVENTS
job.on('readystatechange', lambda s: print(f"Ready State: {s}"))
job.on('accepted', lambda _: print(f" Job ID: {job.id}\n Awaiting results..."))
job.on('noProgress', lambda n: print(json.dumps(n, indent=4).replace('\\n', '\n')))
job.on('error', lambda e: print(json.dumps(e, indent=4).replace('\\n', '\n')))
job.on('nofunds', lambda n: print(json.dumps(n, indent=4).replace('\\n', '\n')))
job.on('result', lambda r: print(json.dumps(r, indent=4).replace('\\n', '\n')))
# EXECUTION
job.exec()
results = job.wait()
# RESULT POST-PROCESSING
print(''.join(results))
When executed, the job is submitted to the DCP scheduler, distributed across workers, and the final uppercase string is printed to the console. You can observe job events such as acceptance, results, and errors in real time.
This complete example provides a foundation you can modify to experiment with different input sets, work functions, and compute group configurations.