Ray: Distributed Computing for All, Part 1

-

That is the primary in a two-part series on distributed computing using Ray. This part shows the way to use Ray in your local PC, and part 2 shows the way to scale Ray to multi-server clusters within the cloud.

gotten a brand new 16-core laptop or desktop, and also you’re wanting to test its power with some heavy computations.

You’re a Python programmer, though not an authority yet, so that you open your favourite LLM and ask it something like this.

“I would love to count the variety of prime numbers inside a given input range. Please give me some Python code for this.”

After just a few seconds, the LLM gives you some code. You may tweak it a bit through a brief back-and-forth, and eventually you find yourself with something like this:

import math, time, os

def is_prime(n: int) -> bool:
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    r = int(math.isqrt(n)) + 1
    for i in range(3, r, 2):
        if n % i == 0:
            return False
    return True

def count_primes(a: int, b: int) -> int:
    c = 0
    for n in range(a, b):
        if is_prime(n):
            c += 1
    return c

if __name__ == "__main__":
    A, B = 10_000_000, 20_000_000
    total_cpus = os.cpu_count() or 1

    # Start "chunky"; we will sweep this later
    chunks = max(4, total_cpus * 2)
    step = (B - A) // chunks

    print(f"CPUs~{total_cpus}, chunks={chunks}")
    t0 = time.time()
    results = []
    for i in range(chunks):
        s = A + i * step
        e = s + step if i < chunks - 1 else B
        results.append(count_primes(s, e))
    total = sum(results)
    print(f"total={total}, time={time.time() - t0:.2f}s")

You run this system and it really works perfectly. The one problem is that it takes quite a little bit of time to run , possibly thirty to sixty seconds, depending on the scale of your input range. That’s probably unacceptable.

What do you do now? You have got several options, with the three commonest probably being:
– Parallelise the code using threads or multi-processing
– Rewrite the code in a “fast” language like C or Rust
– Try a library like Cython, Numba or NumPy

These are all viable options, but each has disadvantages. Options 1 and three significantly increase your code complexity, and the center option may require you to learn a brand new programming language.

What if I told you that there was one other way? One where the changes required to your existing code can be kept to an absolute minimum. One where your runtime is routinely spread across all of your available cores.

That’s precisely what the third-party Ray library guarantees to do.

What's Ray?

The Ray Python library is an open-source distributed computing framework designed to make it easy to scale Python programs from a laptop to a cluster with minimal code changes.

Ray makes it easy to scale and distribute compute-intensive application workloads — from deep learning to data processing — across clusters of distant computers, while also delivering practical application runtime improvements in your laptop, desktop, or perhaps a distant cloud-based compute cluster.

Ray provides a wealthy set of libraries and integrations built on a versatile distributed execution framework, making distributed computing easy and accessible to all.

Briefly, Ray allows you to parallelise and distribute your Python code with minimal effort, whether it’s running locally on a laptop or on an enormous cloud-based cluster.

Using Ray

In the remaining of this text, I’ll take you thru the fundamentals of using Ray to hurry up CPU-intensive Python code, and we’ll arrange some example code snippets to indicate you the way easy it's to include the facility of Ray into your personal workloads. 

To get probably the most out of using Ray, should you are an information scientist or machine learning engineer, there are just a few key concepts you'll want to understand first. Ray is made up of several components.

Ray Data is a scalable library designed for data processing in ML and AI tasks. It offers flexible, high-performance APIs for AI tasks, including batch inference, data preprocessing, and data ingestion for ML training. 

Ray Train is a versatile, scalable library designed for distributed machine learning training and fine-tuning.

Ray Tune is used for Hyperparameter Tuning.

Ray Serve is a scalable library for deploying models to facilitate online inference APIs.

Ray RLlib is used for scalable reinforcement learning
As you'll be able to see, Ray could be very focused on large language models and AI applications, but there's one last essential component I haven’t mentioned yet, and it’s the one I’ll be using in this text.

Ray Core is designed for scaling CPU-intensive general-purpose Python applications. It’s designed to spread your Python workload over all available cores on whichever system you’re running it on.

This text might be talking exclusively about Ray Core.

Two essential concepts to understand inside Ray Core are tasks and actors. 

Tasks are stateless employees or services implemented using Ray by decorating regular Python functions. 

Actors (or stateful employees) are used, for instance, when you'll want to keep track of and maintain the state of dependent variables across your distributed cluster. Actors are implemented by decorating regular Python classes

Each actors and tasks are defined using the identical @ray.distant decorator. Once defined, these tasks are executed with the special .distant() method provided by Ray. We’ll take a look at an example of this next.

Organising a development environment

Before we start coding, we must always arrange a development environment to maintain our projects siloed so that they don’t interfere with one another. I’ll be using conda for this, but be at liberty to make use of whichever tool you like. I’ll be running my code using a Jupyter notebook in a WSL2 Ubuntu shell on Windows.

$ conda create -n ray-test python=3.13 -y 
$ conda activate ray-test
(ray-test) $ conda install ray[default]

Code example  – counting prime numbers

Let’s revisit the instance I gave at the start: counting the variety of primes throughout the interval 10,000,000 to twenty,000,000. 

We’ll run our original Python code and time how long it takes.

import math, time, os

def is_prime(n: int) -> bool:
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    r = int(math.isqrt(n)) + 1
    for i in range(3, r, 2):
        if n % i == 0:
            return False
    return True

def count_primes(a: int, b: int) -> int:
    c = 0
    for n in range(a, b):
        if is_prime(n):
            c += 1
    return c

if __name__ == "__main__":
    A, B = 10_000_000, 20_000_000
    total_cpus = os.cpu_count() or 1

    # Start "chunky"; we will sweep this later
    chunks = max(4, total_cpus * 2)
    step = (B - A) // chunks

    print(f"CPUs~{total_cpus}, chunks={chunks}")
    t0 = time.time()
    results = []
    for i in range(chunks):
        s = A + i * step
        e = s + step if i < chunks - 1 else B
        results.append(count_primes(s, e))
    total = sum(results)
    print(f"total={total}, time={time.time() - t0:.2f}s")

And the output?

CPUs~32, chunks=64
total=606028, time=31.17s

Now, can we improve that using Ray? Yes, by following this easy 4-step process.

Step 1 - Initialise Ray. Add these two lines firstly of your code.

import ray

ray.init()

Step 2 - Create our distant function. That’s easy. Just decorate the function we would like to optimise with the @ray.distant decorator. The function to be decorated is the one which’s performing probably the most work. In our example, that’s the count_primes function.

@ray.distant(num_cpus=1)
def count_primes(start: int, end: int) -> int:
    ...
    ...

Step 3 - Launch the parallel tasks. Call your distant function using the .distant Ray directive.

refs.append(count_primes.distant(s, e))

Step 4 - Wait for all our tasks to finish. Each task in Ray returns an ObjectRef when it’s been called. It is a promise from Ray. It means Ray has set the duty off running remotely, and Ray will return its value in some unspecified time in the future in the longer term. We monitor all of the ObjectRefs returned by running tasks using the ray.get() function. This blocks until all tasks have accomplished.

results = ray.get(tasks)

Let’s put this all together. As you will notice, the changes to our original code are minimal — just 4 lines of code added and a print statement to display the variety of nodes and cores we’re running on.

import math
import time

# -----------------------------------------
# Change No. 1
# -----------------------------------------
import ray
ray.init(auto)

def is_prime(n: int) -> bool:
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    r = int(math.isqrt(n)) + 1
    for i in range(3, r, 2):
        if n % i == 0:
            return False
    return True

# -----------------------------------------
# Change No. 2
# -----------------------------------------
@ray.remote(num_cpus=1)  # pure-Python loop → 1 CPU per task
def count_primes(a: int, b: int) -> int:
    c = 0
    for n in range(a, b):
        if is_prime(n):
            c += 1
    return c

if __name__ == "__main__":
    A, B = 10_000_000, 60_000_000
    total_cpus = int(ray.cluster_resources().get("CPU", 1))

    # Start "chunky"; we will sweep this later
    chunks = max(4, total_cpus * 2)
    step = (B - A) // chunks

    print(f"nodes={len(ray.nodes())}, CPUs~{total_cpus}, chunks={chunks}")
    t0 = time.time()
    refs = []
    for i in range(chunks):
        s = A + i * step
        e = s + step if i < chunks - 1 else B
        # -----------------------------------------
        # Change No. 3
        # -----------------------------------------
        refs.append(count_primes.distant(s, e))

    # -----------------------------------------
    # Change No. 4
    # -----------------------------------------
    total = sum(ray.get(refs))

    print(f"total={total}, time={time.time() - t0:.2f}s")

Now, has all of it been worthwhile? Let’s run the brand new code and see what we get.

2025-11-01 13:36:30,650 INFO employee.py:2004 -- Began an area Ray instance. View the dashboard at 127.0.0.1:8265 
/home/tom/.local/lib/python3.10/site-packages/ray/_private/employee.py:2052: FutureWarning: Tip: In future versions of Ray, Ray will not override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and switch off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
nodes=1, CPUs~32, chunks=64
total=606028, time=3.04s

Well, the result speaks for itself. The Ray Python code is 10x faster than the regular Python code. Not too shabby.

Where does this increase in speed come from? Well, Ray can spread your workload to all of the cores in your system. A core is sort of a mini-CPU. Once we ran our original Python code, it used just one core. That’s nice, but in case your CPU has a couple of core, which latest PCs do, you then’re leaving money on the table, so to talk.

In my case, the CPU has 24 cores, so it’s not surprising that my Ray code was way faster than the non-Ray code.

Monitoring Ray jobs

One other point price mentioning is that Ray makes it very easy to watch job executions via a dashboard. Notice within the output we received when running our Ray example code, we saw this,

...  -- Began an area Ray instance. View the dashboard at 127.0.0.1:8265

It’s showing an area URL link because I’m running this on my desktop. When you were running this on a cluster, the URL would point to a location on the cluster head node.

Once you click on the given URL link, it is best to see something just like this,

Image by Creator

From this important screen, you'll be able to drill all the way down to monitor many features of your Ray programs using the menu links at the highest of the page.

Using Ray actors

I previously mentioned that actors were an integral a part of the Ray core processing. Actors are used to coordinate and share data between Ray tasks. For instance, say you must set a world limit for ALL running tasks that they need to adhere to. Let’s say you might have a pool of employee tasks, but you must be sure that only a maximum of 5 of those tasks can run concurrently. Here is a few code you would possibly think would work.

import math, time, os

def is_prime(n: int) -> bool:
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    r = int(math.isqrt(n)) + 1
    for i in range(3, r, 2):
        if n % i == 0:
            return False
    return True

def count_primes(a: int, b: int) -> int:
    c = 0
    for n in range(a, b):
        if is_prime(n):
            c += 1
    return c

if __name__ == "__main__":
    A, B = 10_000_000, 20_000_000
    total_cpus = os.cpu_count() or 1

    # Start "chunky"; we will sweep this later
    chunks = max(4, total_cpus * 2)
    step = (B - A) // chunks

    print(f"CPUs~{total_cpus}, chunks={chunks}")
    t0 = time.time()
    results = []
    for i in range(chunks):
        s = A + i * step
        e = s + step if i < chunks - 1 else B
        results.append(count_primes(s, e))
    total = sum(results)
    print(f"total={total}, time={time.time() - t0:.2f}s")

We've used a world variable to limit the variety of running tasks, and the code is syntactically correct, running without error. Unfortunately, you won’t get the result you expected. That’s because each Ray task runs in its own process space and has its own copy of the worldwide variable. The worldwide variable is NOT shared between functions. So once we run the above code, we'll see output like this,

Total calls: 200
Intended GLOBAL_QPS: 5.0
Expected time if truly global-limited: ~40.00s
Actual time with 'global var' (broken): 3.80s
Observed cluster QPS: ~52.6 (must have been ~5.0)

To repair this, we use an actor. Recall that an actor is only a Ray-decorated Python class. Here is the code with actors.

import time, ray

ray.init(ignore_reinit_error=True, log_to_driver=False)

# That is our actor
@ray.distant
class GlobalPacer:
    """Serialize calls so cluster-wide rate <= qps."""
    def __init__(self, qps: float):
        self.interval = 1.0 / qps
        self.next_time = time.time()

    def acquire(self):
        # Wait contained in the actor until we will proceed
        now = time.time()
        if now < self.next_time:
            time.sleep(self.next_time - now)
        # Reserve the following slot; guard against drift
        self.next_time = max(self.next_time + self.interval, time.time())
        return True

@ray.distant
def call_api_with_limit(n_calls: int, pacer):
    done = 0
    for _ in range(n_calls):
        # Wait for global permission
        ray.get(pacer.acquire.distant())
        # pretend API call (no extra sleep here)
        done += 1
    return done

if __name__ == "__main__":
    NUM_WORKERS = 10
    CALLS_EACH  = 20
    GLOBAL_QPS  = 5.0  # cluster-wide cap

    total_calls = NUM_WORKERS * CALLS_EACH
    expected_min_time = total_calls / GLOBAL_QPS

    pacer = GlobalPacer.distant(GLOBAL_QPS)

    t0 = time.time()
    ray.get([call_api_with_limit.remote(CALLS_EACH, pacer) for _ in range(NUM_WORKERS)])
    dt = time.time() - t0

    print(f"Total calls: {total_calls}")
    print(f"Global QPS cap: {GLOBAL_QPS}")
    print(f"Expected time (if capped at {GLOBAL_QPS} QPS): ~{expected_min_time:.2f}s")
    print(f"Actual time with actor: {dt:.2f}s")
    print(f"Observed cluster QPS: ~{total_calls/dt:.1f}")

Our limiter code is encapsulated in a category (GlobalPacer) and decorated with ray.distant, meaning it applies to all running tasks. We will see the difference this makes to the output by running the updated code.

Total calls: 200
Global QPS cap: 5.0
Expected time (if capped at 5.0 QPS): ~40.00s
Actual time with actor: 39.86s
Observed cluster QPS: ~5.0

Summary

This text introduced Ray, an open-source Python framework that makes it easy to scale compute-intensive programs from a single core to multiple cores or perhaps a cluster with minimal code changes. 

I briefly mentioned the important thing components of Ray—Ray Data, Ray Train, Ray Tune, Ray Serve, and Ray Core—emphasising that Ray Core is right for general-purpose CPU scaling. 

I explained a number of the essential concepts in Ray Core, corresponding to its introduction of tasks (stateless parallel functions), actors (stateful employees for shared state and coordination), and ObjectRefs (a future promise of a task’s return value)

To showcase some great benefits of using Ray, I started with an easy CPU-intensive example — counting prime numbers over a spread — and showed how running it on a single core will be slow with a naive Python implementation.

As a substitute of rewriting the code in one other language or using complex multiprocessing libraries, Ray lets you parallelise the workload in only 4 easy steps and just just a few extra lines of code:

  • ray.init() to begin Ray
  • Decorate your functions with @ray.distant to show them into parallel tasks
  • .distant() to launch tasks concurrently, and
  • ray.get() to gather task results.

This approach cut the runtime of the prime-counting example from ~30 seconds to ~3 seconds on a 24-core machine.

I also mentioned how easy it's to watch running jobs in Ray using its built-in dashboard and showed the way to access it. 

Finally, I provided an example of using a Ray Actor by showing why global variables aren't suitable for coordinating across tasks, since each employee has its own memory space.

Within the second a part of this series, we’ll see the way to take things to a different level by enabling Ray jobs to make use of much more CPU power as we scale to large multi-node servers within the cloud via Amazon Web Services.

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x