Chapter 4. Remote Actors
In the previous chapter, you learned about Ray remote functions, which are useful for the parallel execution of stateless functions. But what if you need to maintain a state between invocations? Examples of such situations span from a simple counter to a neural network during training to a simulator environment.
One option for maintaining state in these situations is to return the state along with the result and pass it to the next call. Although technically this will work, this is not the best solution, because of the large amount of data that has to be passed around (especially as the size of the state starts to grow). Ray uses actors, which we will cover in this chapter, to manage state.
Note
Much like Ray’s remote functions, all Ray actors are remote actors, even when running on the same machine.
In a nutshell, an actor is a computer process with an address (handle). This means that an actor can also store things in memory, private to the actor process. Before delving into the details of implementing and scaling Ray actors, let’s take a look at the concepts behind them. Actors come from the actor model design pattern. Understanding the actor model is key to effectively managing state and concurrency.
Understanding the Actor Model
The actor model was introduced by Carl Hewitt in 1973 to deal with concurrent computation. The heart of this conceptual model is an actor, a universal primitive of concurrent computation with its state.
An actor has a simple job:
-
Store data
-
Receive messages from other actors
-
Pass messages to other actors
-
Create additional child actors
The data that an actor stores is private to the actor and isn’t visible from outside; it can be accessed and modified only by the actor itself. Changing the actor’s state requires sending messages to the actor that will modify the state. (Compare this to using method calls in object-oriented programming.)
To ensure an actor’s state consistency, actors process one request at a time. All actor method invocations are globally serialized for a given actor. To improve throughput, people often create a pool of actors (assuming they can shard or replicate the actor’s state).
The actor model is a good fit for many distributed system scenarios. Here are some typical use cases where the actor model can be advantageous:
-
You need to deal with a large distributed state that is hard to synchronize between invocations.
-
You want to work with single-threaded objects that do not require significant interaction from external components.
In both situations, you would implement the standalone parts of the work inside an actor. You can put each piece of independent state inside its own actor, and then any changes to the state come in through the actor. Most actor system implementations avoid concurrency issues by using only single-threaded actors.
Now that you know the general principles of the actor model, let’s take a closer look at Ray’s remote actors.
Creating a Basic Ray Remote Actor
Ray implements remote actors as stateful workers. When you create a new remote actor, Ray creates a new worker and schedules the actor’s methods on that worker.
A common example of an actor is a bank account. Let’s take a look at how to implement an account by using Ray remote actors. Creating a Ray remote actor is as simple as decorating a Python class with the @ray.remote
decorator (Example 4-1).
Example 4-1. Implementing a Ray remote actor
@ray
.
remote
class
Account
:
def
__init__
(
self
,
balance
:
float
,
minimal_balance
:
float
):
self
.
minimal
=
minimal_balance
if
balance
<
minimal_balance
:
raise
Exception
(
"Starting balance is less than minimal balance"
)
self
.
balance
=
balance
def
balance
(
self
)
->
float
:
return
self
.
balance
def
deposit
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot deposit negative amount"
)
self
.
balance
=
self
.
balance
+
amount
return
self
.
balance
def
withdraw
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot withdraw negative amount"
)
balance
=
self
.
balance
-
amount
if
balance
<
self
.
minimal
:
raise
Exception
(
"Withdrawal is not supported by current balance"
)
self
.
balance
=
balance
return
balance
The Account
actor class itself is fairly simple and has four methods:
- The constructor
-
Creates an account based on the starting and minimum balance. It also makes sure that the current balance is larger than the minimal one and throws an exception otherwise.
balance
-
Returns the current balance of the account. Because an actor’s state is private to the actor, access to it is available only through the actor’s method.
deposit
-
Deposits an amount to the account and returns a new balance.
withdraw
-
Withdraws an amount from the account and returns a new balance. It also ensures that the remaining balance is greater than the predefined minimum balance and throws an exception otherwise.
Now that you have defined the class, you need to use .remote
to create an instance of this actor (Example 4-2).
Example 4-2. Creating an instance of your Ray remote actor
account_actor
=
Account
.
remote
(
balance
=
100.
,
minimal_balance
=
20.
)
Here, account_actor
represents an actor handle. These handles play an important role in the actor’s lifecycle. Actor processes are terminated automatically when the initial actor handle goes out of scope in Python (note that in this case, the actor’s state is lost).
Tip
You can create multiple distinct actors from the same class. Each will have its own independent state.
As with an ObjectRef
, you can pass an actor handle as a parameter to another actor or Ray remote function or Python code.
Note that Example 4-1 uses the @ray.remote
annotation to define an ordinary Python class as a Ray remote actor. Alternatively, instead of using an annotation, you can use Example 4-3 to convert a Python class into a remote actor.
Example 4-3. Creating an instance of a Ray remote actor without the decorator
Account
=
ray
.
remote
(
Account
)
account_actor
=
Account
.
remote
(
balance
=
100.
,
minimal_balance
=
20.
)
Once you have a remote actor in place, you can invoke it by using Example 4-4.
Example 4-4. Invoking a remote actor
(
f
"Current balance
{
ray
.
get
(
account_actor
.
balance
.
remote
())
}
"
)
(
f
"New balance
{
ray
.
get
(
account_actor
.
withdraw
.
remote
(
40.
))
}
"
)
(
f
"New balance
{
ray
.
get
(
account_actor
.
deposit
.
remote
(
30.
))
}
"
)
Tip
It’s important to handle exceptions, which in the example can occur in both the the deposit and withdrawal method’s code. To handle the exceptions, you should augment Example 4-4 with try
/except
clauses:
try
:
result
=
ray
.
get
(
account_actor
.
withdraw
.
remote
(
-
40.
))
except
Exception
as
e
:
(
f
"Oops! \
{
e
}
occurred."
)
This ensures that the code will intercept all the exceptions thrown by the actor’s code and implement all the necessary actions.
You can also create named actors by using Example 4-5.
Example 4-5. Creating a named actor
account_actor
=
Account
.
options
(
name
=
'Account'
)
\.
remote
(
balance
=
100.
,
minimal_balance
=
20.
)
Once the actor has a name, you can use it to obtain the actor’s handle from any place in the code:
ray
.
get_actor
(
'Account'
)
As defined previously, the default actor’s lifecycle is linked to the actor’s handle being in scope.
An actor’s lifetime can be decoupled from its handle being in scope, allowing an actor to persist even after the driver process exits. You can create a detached actor by specifying the lifetime parameter as detached
(Example 4-6).
Example 4-6. Making a detached actor
account_actor
=
Account
.
options
(
name
=
'Account'
,
lifetime
=
'detached'
)
\.
remote
(
balance
=
100.
,
minimal_balance
=
20.
)
In theory, you can make an actor detached without specifying its name, but since ray.get_actor
operates by name, detached actors make the most sense with a name. You should name your detached actors so you can access them, even after the actor’s handle is out of scope. The detached actor itself can own any other tasks and objects.
In addition, you can manually delete actors from inside an actor, using ray.actor.exit_actor
, or by using an actor’s handle ray.kill(account_actor)
. This can be useful if you know that you do not need specific actors anymore and want to reclaim the resources.
As shown here, creating a basic Ray actor and managing its lifecycle is fairly easy, but what happens if the Ray node on which the actor is running goes down for some reason?1 The @ray.remote
annotation allows you to specify two parameters that control behavior in this case:
max_restarts
-
Specify the maximum number of times that the actor should be restarted when it dies unexpectedly. The minimum valid value is
0
(default), which indicates that the actor doesn’t need to be restarted. A value of-1
indicates that an actor should be restarted indefinitely. max_task_retries
-
Specifies the number of times to retry an actor’s task if the task fails because of a system error. If set to
-1
, the system will retry the failed task until the task succeeds, or the actor has reached itsmax_restarts
limit. If set ton > 0
, the system will retry the failed task up to n times, after which the task will throw aRayActorError
exception uponray.get
.
As further explained in the next chapter and in the Ray fault-tolerance documentation, when an actor is restarted, Ray will re-create its state by rerunning its constructor. Therefore, if a state was changed during the actor’s execution, it will be lost. To preserve such a state, an actor has to implement its custom persistence.
In our example case, the actor’s state is lost on failure since we haven’t used actor persistence. This might be OK for some use cases but is not acceptable for others—see also the Ray documentation on design patterns. In the next section, you will learn how to programmatically implement custom actor persistence.
Implementing the Actor’s Persistence
In this implementation, the state is saved as a whole, which works well enough if the size of the state is relatively small and the state changes are relatively rare. Also, to keep our example simple, we use local disk persistence. In reality, for a distributed Ray case, you should consider using Network File System (NFS), Amazon Simple Storage Service (S3), or a database to enable access to the actor’s data from any node in the Ray cluster.
A persistent Account
actor is presented in Example 4-7.2
Example 4-7. Defining a persistent actor, using filesystem persistence
@ray
.
remote
class
Account
:
def
__init__
(
self
,
balance
:
float
,
minimal_balance
:
float
,
account_key
:
str
,
basedir
:
str
=
'.'
):
self
.
basedir
=
basedir
self
.
key
=
account_key
if
not
self
.
restorestate
():
if
balance
<
minimal_balance
:
raise
Exception
(
"Starting balance is less than minimal balance"
)
self
.
balance
=
balance
self
.
minimal
=
minimal_balance
self
.
storestate
()
def
balance
(
self
)
->
float
:
return
self
.
balance
def
deposit
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot deposit negative amount"
)
self
.
balance
=
self
.
balance
+
amount
self
.
storestate
()
return
self
.
balance
def
withdraw
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot withdraw negative amount"
)
balance
=
self
.
balance
-
amount
if
balance
<
self
.
minimal
:
raise
Exception
(
"Withdrawal is not supported by current balance"
)
self
.
balance
=
balance
self
.
storestate
()
return
balance
def
restorestate
(
self
)
->
bool
:
if
exists
(
self
.
basedir
+
'/'
+
self
.
key
):
with
open
(
self
.
basedir
+
'/'
+
self
.
key
,
"rb"
)
as
f
:
bytes
=
f
.
read
()
state
=
ray
.
cloudpickle
.
loads
(
bytes
)
self
.
balance
=
state
[
'balance'
]
self
.
minimal
=
state
[
'minimal'
]
return
True
else
:
return
False
def
storestate
(
self
):
bytes
=
ray
.
cloudpickle
.
dumps
(
{
'balance'
:
self
.
balance
,
'minimal'
:
self
.
minimal
})
with
open
(
self
.
basedir
+
'/'
+
self
.
key
,
"wb"
)
as
f
:
f
.
write
(
bytes
)
If we compare this implementation with the original in Example 4-1, we will notice several important changes:
-
Here the constructor has two additional parameters:
account_key
andbasedir
. The account key is a unique identifier for the account that is also used as the name of the persistence file. Thebasedir
parameter indicates a base directory used for storing persistence files. When the constructor is invoked, we first check whether a persistent state for this account is saved, and if there is one, we ignore the passed-in balance and minimum balance and restore them from the persistence state. -
Two additional methods are added to the class:
store_state
andrestore_state
. Thestore_states
is a method that stores an actor state into a file. State information is represented as a dictionary with keys as names of the state elements and values as the state elements, values. We are using Ray’s implementation of cloud pickling to convert this dictionary to the byte string and then write this byte string to the file, defined by the account key and base directory. (Chapter 5 provides a detailed discussion of cloud pickling.) Therestore_states
method restores the state from a file defined by an account key and base directory. The method reads a binary string from the file and uses Ray’s implementation of cloud pickling to convert it to the dictionary. Then it uses the content of the dictionary to populate the state. -
Finally, both
deposit
andwithdraw
methods, which are changing the state, use thestore_state
method to update persistence.
The implementation shown in Example 4-7 works fine, but our account actor implementation now contains too much persistence-specific code and is tightly coupled to file persistence. A better solution is to separate persistence-specific code into a separate class.
We start by creating an abstract class defining methods that have to be implemented by any persistence class (Example 4-8).
Example 4-8. Defining a base persistence class
class
BasePersitence
:
def
exists
(
self
,
key
:
str
)
->
bool
:
pass
def
save
(
self
,
key
:
str
,
data
:
dict
):
pass
def
restore
(
self
,
key
:
str
)
->
dict
:
pass
This class defines all the methods that have to be implemented by a concrete persistence implementation. With this in place, a file persistence class implementing base persistence can be defined as shown in Example 4-9.
Example 4-9. Defining a file persistence class
class
FilePersistence
(
BasePersitence
):
def
__init__
(
self
,
basedir
:
str
=
'.'
):
self
.
basedir
=
basedir
def
exists
(
self
,
key
:
str
)
->
bool
:
return
exists
(
self
.
basedir
+
'/'
+
key
)
def
save
(
self
,
key
:
str
,
data
:
dict
):
bytes
=
ray
.
cloudpickle
.
dumps
(
data
)
with
open
(
self
.
basedir
+
'/'
+
key
,
"wb"
)
as
f
:
f
.
write
(
bytes
)
def
restore
(
self
,
key
:
str
)
->
dict
:
if
not
self
.
exists
(
key
):
return
None
else
:
with
open
(
self
.
basedir
+
'/'
+
key
,
"rb"
)
as
f
:
bytes
=
f
.
read
()
return
ray
.
cloudpickle
.
loads
(
bytes
)
This implementation factors out most of the persistence-specific code from our original implementation in Example 4-7. Now it is possible to simplify and generalize an account implementation; see Example 4-10.
Example 4-10. Implementing a persistent actor with pluggable persistence
@ray
.
remote
class
Account
:
def
__init__
(
self
,
balance
:
float
,
minimal_balance
:
float
,
account_key
:
str
,
persistence
:
BasePersitence
):
self
.
persistence
=
persistence
self
.
key
=
account_key
if
not
self
.
restorestate
():
if
balance
<
minimal_balance
:
raise
Exception
(
"Starting balance is less than minimal balance"
)
self
.
balance
=
balance
self
.
minimal
=
minimal_balance
self
.
storestate
()
def
balance
(
self
)
->
float
:
return
self
.
balance
def
deposit
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot deposit negative amount"
)
self
.
balance
=
self
.
balance
+
amount
self
.
storestate
()
return
self
.
balance
def
withdraw
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot withdraw negative amount"
)
balance
=
self
.
balance
-
amount
if
balance
<
self
.
minimal
:
raise
Exception
(
"Withdrawal is not supported by current balance"
)
self
.
balance
=
balance
self
.
storestate
()
return
balance
def
restorestate
(
self
)
->
bool
:
state
=
self
.
persistence
.
restore
(
self
.
key
)
if
state
!=
None
:
self
.
balance
=
state
[
'balance'
]
self
.
minimal
=
state
[
'minimal'
]
return
True
else
:
return
False
def
storestate
(
self
):
self
.
persistence
.
save
(
self
.
key
,
{
'balance'
:
self
.
balance
,
'minimal'
:
self
.
minimal
})
Only the code changes from our original persistent actor implementation (Example 4-7) are shown here. Note that the constructor is now taking the BasePersistence
class, which allows for easily changing the persistence implementation without changing the actor’s code. Additionally, the restore_state
and savestate
methods are generalized to move all the persistence-specific code to the persistence class.
This implementation is flexible enough to support different persistence implementations, but if a persistence implementation requires permanent connections to a persistence source (for example, a database connection), it can become unscalable by simultaneously maintaining too many connections. In this case, we can implement persistence as an additional actor. But this requires scaling of this actor. Let’s take a look at the options that Ray provides for scaling actors.
Scaling Ray Remote Actors
The original actor model described earlier in this chapter typically assumes that actors are lightweight (e.g., contain a single piece of state) and do not require scaling or parallelization. In Ray and similar systems (including Akka), actors are often used for coarser-grained implementations and can require scaling.3
As with Ray remote functions, you can scale actors both horizontally (across processes/machines) with pools, or vertically (with more resources). “Resources / Vertical Scaling” covers how to request more resources, but for now, let’s focus on horizontal scaling.
You can add more processes for horizontal scaling with Ray’s actor pool, provided by the ray.util
module. This class is similar to a multiprocessing pool and lets you schedule your tasks over a fixed pool of actors.
The actor pool effectively uses a fixed set of actors as a single entity and manages which actor in the pool gets the next request. Note that actors in the pool are still individual actors and their state is not merged. So this scaling option works only when an actor’s state is created in the constructor and does not change during the actor’s execution.
Let’s take a look at how to use an actor’s pool to improve the scalability of our account class by adding an actor’s pool in Example 4-11.
Example 4-11. Using an actor’s pool for implementing persistence
pool
=
ActorPool
([
FilePersistence
.
remote
(),
FilePersistence
.
remote
(),
FilePersistence
.
remote
()])
@ray
.
remote
class
Account
:
def
__init__
(
self
,
balance
:
float
,
minimal_balance
:
float
,
account_key
:
str
,
persistence
:
ActorPool
):
self
.
persistence
=
persistence
self
.
key
=
account_key
if
not
self
.
restorestate
():
if
balance
<
minimal_balance
:
raise
Exception
(
"Starting balance is less than minimal balance"
)
self
.
balance
=
balance
self
.
minimal
=
minimal_balance
self
.
storestate
()
def
balance
(
self
)
->
float
:
return
self
.
balance
def
deposit
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot deposit negative amount"
)
self
.
balance
=
self
.
balance
+
amount
self
.
storestate
()
return
self
.
balance
def
withdraw
(
self
,
amount
:
float
)
->
float
:
if
amount
<
0
:
raise
Exception
(
"Cannot withdraw negative amount"
)
balance
=
self
.
balance
-
amount
if
balance
<
self
.
minimal
:
raise
Exception
(
"Withdrawal is not supported by current balance"
)
self
.
balance
=
balance
self
.
storestate
()
return
balance
def
restorestate
(
self
)
->
bool
:
while
(
self
.
persistence
.
has_next
()):
self
.
persistence
.
get_next
()
self
.
persistence
.
submit
(
lambda
a
,
v
:
a
.
restore
.
remote
(
v
),
self
.
key
)
state
=
self
.
persistence
.
get_next
()
if
state
!=
None
:
(
f
'Restoring state
{
state
}
'
)
self
.
balance
=
state
[
'balance'
]
self
.
minimal
=
state
[
'minimal'
]
return
True
else
:
return
False
def
storestate
(
self
):
self
.
persistence
.
submit
(
lambda
a
,
v
:
a
.
save
.
remote
(
v
),
(
self
.
key
,
{
'balance'
:
self
.
balance
,
'minimal'
:
self
.
minimal
}))
account_actor
=
Account
.
options
(
name
=
'Account'
)
.
remote
(
balance
=
100.
,
minimal_balance
=
20.
,
account_key
=
'1234567'
,
persistence
=
pool
)
Only the code changes from our original implementation are shown here. The code starts by creating a pool of three identical file persistence actors, and then this pool is passed to an account implementation.
The syntax of a pool-based execution is a lambda function that takes two parameters: an actor reference and a value to be submitted to the function. The limitation here is that the value is a single object. One of the solutions for functions with multiple parameters is to use a tuple that can contain an arbitrary number of components. The function itself is defined as a remote function on the required actor’s method.
An execution on the pool is asynchronous (it routes requests to one of the remote actors internally). This allows faster execution of the store_state
method, which does not need the results from data storage. Here implementation is not waiting for the result’s state storage to complete; it just starts the execution. The restore_state
method, on another hand, needs the result of pool invocation to proceed. A pool implementation internally manages the process of waiting for execution results to become ready and exposes this functionality through the get_next
function (note that this is a blocking call). The pool’s implementation manages a queue of execution results (in the same order as the requests). Whenever we need to get a result from the pool, we therefore must first clear out the pool results queue to ensure that we get the right result.
In addition to the multiprocessing-based scaling provided by the actor’s pool, Ray supports scaling of the actor’s execution through concurrency. Ray offers two types of concurrency within an actor: threading and async execution.
When using concurrency inside actors, keep in mind that Python’s global interpreter lock (GIL) will allow only one thread of Python code running at once. Pure Python will not provide true parallelism. On another hand, if you invoke NumPy, Cython, TensorFlow, or PyTorch code, these libraries will release the GIL when calling into C/C++ functions. By overlapping the time waiting for I/O or working in native libraries, both threading and async actor execution can achieve some parallelism.
The asyncio library can be thought of as cooperative multitasking: your code or library needs to explicitly signal that it is waiting on a result, and Python can go ahead and execute another task by explicitly switching execution context. asyncio works by having a single process running through an event loop and changing which task it is executing when a task yields/awaits. asyncio tends to have lower overhead than multithreaded execution and can be a little easier to reason about. Ray actors, but not remote functions, integrate with asyncio, allowing you to write asynchronous actor methods.
You should use threaded execution when your code spends a lot of time blocking but not yielding control by calling await
. Threads are managed by the operating system deciding when to run which thread. Using threaded execution can involve fewer code changes, as you do not need to explicitly indicate where your code is yielding. This can also make threaded execution more difficult to reason about.
You need to be careful and selectively use locks when accessing or modifying objects with both threads and asyncio. In both approaches, your objects share the same memory. By using locks, you ensure that only one thread or task can access the specific memory. Locks have some overhead (which increases as more processes or threads are waiting on a lock). As a result, an actor’s concurrency is mostly applicable for use cases when a state is populated in a constructor and never changes.
To create an actor that uses asyncio, you need to define at least one async method. In this case, Ray will create an asyncio event loop for executing the actor’s methods. Submitting tasks to these actors is the same from the caller’s perspective as submitting tasks to a regular actor. The only difference is that when the task is run on the actor, it is posted to an asyncio event loop running in a background thread or thread pool instead of running directly on the main thread. (Note that using blocking ray.get
or ray.wait
calls inside an async actor method is not allowed, because they will block the execution of the event loop.)
Example 4-12 presents an example of a simple async actor.
Example 4-12. Creating a simple async actor
@ray
.
remote
class
AsyncActor
:
async
def
computation
(
self
,
num
):
(
f
'Actor waiting for
{
num
}
sec'
)
for
x
in
range
(
num
):
await
asyncio
.
sleep
(
1
)
(
f
'Actor slept for
{
x
+
1
}
sec'
)
return
num
Because the method computation
is defined as async
, Ray will create an async actor. Note that unlike ordinary async
methods, which require await
to invoke them, using Ray async actors does not require any special invocation semantics. Additionally, Ray allows you to specify the max concurrency for the async actor’s execution during the actor’s creation:
actor
=
AsyncActor
.
options
(
max_concurrency
=
5
)
.
remote
()
To create a threaded actor, you need to specify max_concurrency
during actor creation (Example 4-13).
Example 4-13. Creating a simple threaded actor
@ray
.
remote
class
ThreadedActor
:
def
computation
(
self
,
num
):
(
f
'Actor waiting for \
{
num
}
sec'
)
for
x
in
range
(
num
):
sleep
(
1
)
(
f
'Actor slept for \
{
x
+
1
}
sec'
)
return
num
actor
=
ThreadedActor
.
options
(
max_concurrency
=
3
)
.
remote
()
Tip
Because both async and threaded actors are use max_concurrency
, the type of actor created might be a little confusing. The thing to remember is that if max_concurrency
is used, the actor can be either async or threaded. If at least one of the actor’s methods is async, the actor is async; otherwise, it is a threaded one.
So, which scaling approach should we use for our implementation? “Multiprocessing vs. Threading vs. AsyncIO in Python” by Lei Mao provides a good summary of features for various approaches (Table 4-1).
Scaling approach | Feature | Usage criteria |
---|---|---|
Actor pool |
Multiple processes, high CPU utilization |
CPU bound |
Async actor |
Single process, single thread, cooperative multitasking, tasks cooperatively decide on switching |
Slow I/O bound |
Threaded actor |
Single process, multiple threads, preemptive multitasking, OS decides on task switching |
Fast I/O bound and nonasync libraries you do not control |
Ray Remote Actors Best Practices
Because Ray remote actors are effectively remote functions, all the Ray remote best practices described in the previous chapter are applicable. In addition, Ray has some actor-specific best practices.
As mentioned before, Ray offers support for actors’ fault tolerance. Specifically for actors, you can specify max_restarts
to automatically enable restarting for Ray actors. When your actor or the node hosting that actor crashes, the actor will be automatically reconstructed. However, this doesn’t provide ways for you to restore application-level states in your actor. Consider actor persistence approaches, described in this chapter to ensure restoration of execution-level states as well.
If your applications have global variables that you have to change, do not change them in remote functions. Instead, use actors to encapsulate them and access them through the actor’s methods. This is because remote functions are running in different processes and do not share the same address space. As a result, these changes are not reflected across Ray driver and remote functions.
One of the common application use cases is the execution of the same remote function many times for different datasets. Using the remote functions directly can cause delays because of the creation of new processes for function. This approach can also overwhelm the Ray cluster with a large number of processes. A more controlled option is to use the actor’s pool. In this case, a pool provides a controlled set of workers that are readily available (with no process creation delay) for execution. As the pool is maintaining its requests queue, the programming model for this option is identical to starting independent remote functions but provides a better-controlled execution environment.
Conclusion
In this chapter, you learned how to use Ray remote actors to implement stateful execution in Ray. You learned about the actor model and how to implement Ray remote actors. Note that Ray internally heavily relies on using actors—for example, for multinode synchronization, streaming (see Chapter 6), and microservices implementation (see Chapter 7). It is also widely used for ML implementations; see, for example, use of actors for implementing a parameter server.
You also learned how to improve an actor’s reliability by implementing an actor’s persistence and saw a simple example of persistence implementation.
Finally, you learned about the options that Ray provides for scaling actors, their implementation, and trade-offs.
In the next chapter, we will discuss additional Ray design details.
1 Python exceptions are not considered system errors and will not trigger restarts. Instead, the exception will be saved as the result of the call, and the actor will continue to run as normal.
2 In this implementation, we are using filesystem persistence, but you can use the same approach with other types of persistence, such as S3 or databases.
3 A coarse-grained actor is a single actor that may contain multiple pieces of state. In contrast, in a fine-grained approach, each piece of state would be represented as a separate actor. This is similar to the concept of coarse-grained locking.
Get Scaling Python with Ray now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.