MicroPython Projects

4.7 (3 reviews total)
By Jacob Beningo
    Advance your knowledge in tech with a Packt subscription

  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Managing Real-Time Tasks

About this book

With the increasing complexity of embedded systems seen over the past few years, developers are looking for ways to manage them easily by solving problems without spending a lot of time on finding supported peripherals. MicroPython is an efficient and lean implementation of the Python 3 programming language, which is optimized to run on microcontrollers. MicroPython Projects will guide you in building and managing your embedded systems with ease.

This book is a comprehensive project-based guide that will help you build a wide range of projects and give you the confidence to design complex projects spanning new areas of technology such as electronic applications, automation devices, and IoT applications. While building seven engaging projects, you'll learn how to enable devices to communicate with each other, access and control devices over a TCP/IP socket, and store and retrieve data. The complexity will increase progressively as you work on different projects, covering areas such as driver design, sensor interfacing, and MicroPython kernel customization.

By the end of this MicroPython book, you'll be able to develop industry-standard embedded systems and keep up with the evolution of the Internet of Things.

Publication date:
April 2020
Publisher
Packt
Pages
294
ISBN
9781789958034

 

Managing Real-Time Tasks

Embedded systems need a way to schedule activities and respond to events in an efficient and deterministic manner. MicroPython offers developers several methods to achieve task scheduling.

In this chapter, we will review the methods that are most commonly used by developers and how to use uasyncio to schedule our own real-time tasks.

The following topics will be covered in this chapter:

  • The need for real-time scheduling
  • MicroPython scheduling techniques
  • Writing a scheduling loop with uasyncio
 

Technical requirements

The example code for this chapter can be found in this book's GitHub repository: https://github.com/PacktPublishing/MicroPython-Projects/tree/master/Chapter02

In order to run the examples, you will require the following hardware and software:

  • Pyboard Revision 1.0 or 1.1
  • Pyboard Series-D
  • Terminal application (such as PuTTy, RealTerm, or a Terminal)
  • A text editor (such as VS Code or PyCharm)
 

The need for real-time scheduling

A real-time embedded system is a system with a dedicated purpose. The real-time system may operate standalone or it may be a component or subsystem of a larger device. Real-time systems are often event-driven and must produce the same output and timing when given the same initial conditions. A real-time system might be built using a microcontroller system that uses a bare-metal scheduler or a real-time operating system (RTOS) to schedule all of its system tasks. Alternatively, it could be built using a System on Chip (SoC) or Field Programming Gate Array (FPGA).

Every embedded system is not necessarily a real-time system. An application processor such as Raspberry Pi using Raspbian or Linux would not be a real-time system because, for a given set of inputs, while the system may give the same output, the time taken can vary wildly due to the multitasking nature of the system. General-purpose operating systems often interrupt tasks to handle OS-related functions, which results in the computing time being variable and non-deterministic.

There are several characteristics that can be used to identify a real-time embedded system:

  • They're event-driven as they do not poll inputs.
  • They're deterministic because when given the same initial conditions, they produce the same outputs in the same time frame.
  • They're resource-constrained in some manner; for example, clock speed, memory, or energy consumption.
  • They use a dedicated microcontroller-based processor.
  • They may use an RTOS to manage system tasks.

Real-time system types

Real-time systems can be subdivided into two categories: soft real-time and hard real-time systems. Both types require that the system executes in a deterministic and predictable manner. However, they differ in what happens if a deadline is missed. A soft real-time system that misses a deadline is considered to be annoying to its users. It's undesirable for the deadline to be missed and may decrease the usefulness of the system after the deadline, but it's not critical. A hard real-time system, on the other hand, will dramatically decrease its usefulness after a deadline and results in a fatal system fault.

An example of a soft real-time system is a Human Machine Interface (HMI) with a touch controller that is controlling a home furnace. There may be a deadline where the system needs to respond to user input within 1 second of the screen being touched. If a user goes and touches the screen but the system doesn't respond for 3 or 4 seconds, the result is not world ending, but it may make the user complain about how slow the system is.

A hard real-time system could be an electronic braking system that needs to respond to a user pressing the brake pedal within 30 milliseconds. If a user were to press the brake and it took 2 seconds for the brakes to respond, the outcome could be critical. The system's failure to respond could result in injury to the user and dramatically decreases the usefulness of the embedded system.

It is possible to have an embedded system that has a mix of hard and soft requirements. The software in an embedded system is often subdivided into separate tasks based on function and timing requirements. We might find that the user interface on a system is considered to have soft real-time requirements, while the actuator control task must have hard real-time requirements. The type of system that is being built will often factor in the type of scheduler that is used in the solution.

Now, let's explore the different scheduling architectures that can be used with MicroPython to achieve real-time performance.

 

MicroPython scheduling techniques

When it comes to real-time scheduling using MicroPython, there are five common techniques that developers can employ. These techniques are as follows:

  • Round-robin scheduling
  • Periodic scheduling using timers
  • Event-driven scheduling
  • Cooperative scheduling
  • MicroPython threads

We'll discuss them in detail in the subsequent sections. In the rest of this chapter, we will build example projects to explore several of these scheduling paradigms. We will also give special treatment to the uasyncio library at the end of this chapter, which is a powerful library for scheduling in MicroPython.

Round-robin scheduling

Round-robin scheduling is nothing more than an infinite loop that is created with a while loop. Inside the loop, developers add their task code and each task is executed sequentially, one after the other. While round-robin is the easiest and simplest scheduling paradigm to implement, there are several problems that developers will encounter when using it. First, getting the application tasks to run at the right rates can be difficult. Any code that is added or removed from the application will result in changes to the loop timing. The reason for this is that there is now more or less code to execute per loop. Second, each task has to be designed to recognize that there are other tasks, which means that they cannot block or wait for an event. They must check and then move on so that the other code has the opportunity to use the processor.

Round-robin scheduling can also be used with interrupts to handle any real-time events that might be occurring in the system. The loop handles all the soft real-time tasks, and then the hard real-time tasks are allocated to interrupt handlers. This helps to provide a balance that ensures each type is executed within a reasonable period of time. Round-robin is a good technique for beginners who are just trying to get a simple application up and running.

As we discussed earlier, adding or removing code affects the loop time, which can affect how the system performs. Round-robin schedulers can handle soft real-time tasks. Any events or hard real-time requirements need to be handled using interrupts. I often refer to this as round-robin scheduling with interrupts. A flowchart showing round-robin scheduling with interrupts can be seen in the following diagram:

The main round-robin loop is often referred to as the background loop. This loop constantly executes in the background when there are no interrupts executing. The interrupts themselves are referred to as the foreground and handle any hard real-time events that need to be handled by the system. These functions trump background tasks and run immediately. It's also important to note that MicroPython handles clearing the interrupt flags for developers, so while they are shown in the preceding diagram, this detail is abstracted and handled by the MicroPython kernel.

In C, an application that uses round-robin scheduling might look something like the following:

int main (void)
{
// Initialize the Microcontroller Unit (MCU) peripherals
System_Init();

while(1)
{
Task1();

Task2();

Task3();
}
// The application should never exit. Return 1 if
// we do reach this point!

return 1;
}

In this example, the code enters into the main function, initializes the microcontroller, and then enters into an infinite while loop that calls each task in order. This is a design pattern that every embedded software developer will have seen early in their career and should be quite familiar with.

Implementing round-robin in MicroPython is very similar:

  1. First, it's important to recall that the application entry for MicroPython is located within main.py. To access any peripherals, the pyb library needs to be imported into the application (or the machine library for code that can be ported across MicroPython ports).
  2. Second, any initialization and task functions need to be defined above the main loop. This ensures that they are defined before they are called by the Python interpreter.
  3. Finally, an infinite loop is created using a while True statement. Each defined task is entered into this loop. The loop's timing can be controlled and tuned using pyb.delay().

Building a task manager using round-robin scheduling

Let's look at an example application that generates an LED railroad lights pattern. From a hardware perspective, this requires the use of two LEDs on the pyboard, such as the blue and yellow LEDs (on the pyboard series-D, you might use the green and blue LEDs). I prefer to use these because when we save new code to the pyboard, the red LED is used to show that the filesystem is being written to, and we don't want to interfere with that indicator. If we want one LED to be on while the other is off and then toggle them back and forth, we will need to initialize the blue LED to be on and the yellow to be off. We can then create two separate tasks, one to control the yellow LED and the other to control the blue LED. The Python code for this is as follows:

import pyb   # For uPython MCU features
import time

# define LED color constants
LED_RED = 1
LED_GREEN = 2
LED_BLUE = 3

LED_YELLOW = 4

def task1():
pyb.LED(LED_BLUE).toggle()

def task2():
pyb.LED(LED_GREEN).toggle()

However, the application is not complete until we initialize the LEDs and schedule the tasks to run. The following code shows the LED railroad application's initialization and task execution being written using round-robin scheduling. The main loop is delayed by 150 milliseconds, as well as each loop using the sleep_ms method from the time module. Importing time actually imports the utime module, but importing time can make porting code a little bit easier:

# Setup the MCU and application code to starting conditions
# The blue LED will start on, the yellow LED will be off
pyb.LED(LED_BLUE).on()
pyb.LED(LED_GREEN).off()

# Main application loop
while True:
# Run the first task
task1()

# Run the second task
task2()

# Delay 150 ms
pyb.delay(150)

These two code blocks, when combined, provide us with our first MicroPython application. Running the application on the pyboard can be done by copying the main.py script onto the development board. This can be done either directly, through a Python IDE such as PyCharm, or manually using the following steps:

  1. Connect the pyboard to your computer with a USB cable.
  2. Open your Terminal application and connect to the pyboard (refer to the MicroPython documentation | Quick reference for the pyboard | MicroPython tutorial for the pyboard | 3. Getting a MicroPython REPL prompt, for details).
  3. In the serial Terminal, press Ctrl + C to interrupt any currently running scripts.
  4. Copy the script to the pyboard USB drive. While the copy is in progress, the red LED will be lit up.
  5. Once the red light has gone off, the pyboard flash system will be updated.
  6. In the Terminal, press Ctrl + D to perform a soft reset.
For additional methods regarding how to deploy the application and develop within the PyCharm environment, refer to the Appendix, Downloading and Running MicroPython Code.

Now, you should see the blue and green LEDs toggling back and forth.

Periodic scheduling using timers

There may be applications where every task that needs to be executed is periodic, such as a push button that needs to be sampled every 10 milliseconds; a display that needs to be updated 60 times per second; or a sensor that is sampled at 10 Hz or interrupts when a value has gone out of range. In purely periodic systems, developers can architect their software to use periodic timers to execute tasks. Each timer can be set up to represent a single task that is executed at the desired rate. When the timer interrupt fires, the task executes.

When using periodic timers for task scheduling, it's important to keep in mind that the task code will be executed from an interrupt handler. Developers should follow best practices for using interrupts, such as the following:

  • Keep ISRs short and fast.
  • Perform measurements to understand interrupt timing and latency.
  • Use interrupt priority settings to emulate pre-emption.
  • Make sure that task variables are declared as volatile.
  • Avoid calling multiple functions from an ISR.
  • Disable interrupts as little as possible.
  • Use micropython.schedule() to schedule a function to execute as soon as the MicroPython scheduler is able to.

When using periodic timers to schedule tasks, some of these best practices can be bent slightly. However, if the developer carefully monitors their task timing, bending the rules shouldn't be an issue. If it is, then any hard real-time activity can be handled by the interrupt task and then a round-robin loop can be notified to finish processing the task at a later time.

Timers guarantee that the task will be executed at a regular interval, no matter what is being executed, assuming that a higher-priority interrupt is not executing. The key thing to remember is that these tasks are executed within an interrupt, so the tasks need to be kept short and fast! Developers who use this method should handle any high-priority activity in the task and then offload the rest of the task to the background. For example, a task that handles an incoming byte over a Universal Asynchronous Receiver/Transmitter (UART) device can process the incoming byte by storing it in a circular buffer and then allowing a background task to later process the circular buffer. This keeps the interrupt task short and sweet while allowing the lower-priority processing to be done in the background.

Interrupts within MicroPython are also special in that they are garbage collector (gc) locked. What this means to a developer is that you cannot allocate memory in an ISR. All memory, classes, and so on need to be allocated before being used by the ISR. This has an interesting side effect in that if something goes wrong while executing an ISR, the developer has no way of knowing what went wrong! To get traceback information in situations where memory can't be allocated, such as in ISRs, developers can use the MicroPython emergency exception buffer. This is done by adding the following line of code to either the top of main.py or boot.py:

micropython.alloc_emergency_exception_buf(100) 

This line of code is used to allocate 100 bytes to store the traceback information for ISRs and any other tracebacks that occur in areas where memory cannot be allocated. If an exception occurs, the Python traceback information is saved to this buffer and then printed to the REPL. This allows a developer to then figure out what they did wrong and correct it. The value of 100 is recommended as the buffer size by the MicroPython documentation.

When considering using timers for tasks, it's also important to recognize that each time an interrupt fires on an Arm Cortex®-M processor, there is a 12–15 clock cycle overhead to switch from the main code to the interrupt and then again to switch back. The reason for this overhead is that the processor needs to save and restore context information for the application when switching into and out of the interrupts. The nice thing is that these transitions, while they consume clock cycles, are deterministic!

Building a task manager using periodic scheduling

Setting up a timer to behave as a periodic task is exactly the same as setting up a timer in MicroPython for any other purpose. We can create an application very similar to our round-robin scheduler using timers by initializing a timer for each task in the application. The first timer will control the blue LED, while the second will control the green LED. Each timer will use a callback function to the task code that will be executed when the timer expires.

We can use the exact same format for our code that we used previously. We will initialize the blue LED as on, and the green LED as off. This allows us to let the timers free-run and generate the railroad pattern that we saw earlier. It's important to note that if we let the timer free-run, even if we stop the application in the REPL, the timers will continue to execute! The reason for this is that the timers are hardware peripherals that will run until the peripheral is disabled, even if we exit our application and return to the REPL. I mention this because any print statements you add to your callback functions will continue to populate the REPL, even after you halt the program, which can make it difficult to work or determine the state of the application.

When using timers to set up tasks, there is no need for an infinite while loop like we saw with the round-robin applications. The timers will just free-run. If the infinite loop is not added to main.py, background processing will fall back to the system REPL and sit there instead. I personally still like to include the while loop and some status information so that I know whether the MicroPython interpreter is executing code. In this example, we will put a sleep delay in the main loop and then calculate how long the application has been running.

The Python code for our tasks is identical to the round-robin example, except for the addition of the emergency exception buffer, as shown here:

import micropython # For emergency exception buffer
import pyb # For uPython MCU
import time

micropython.alloc_emergency_exception_buf(100)

LED_RED = 1
LED_GREEN = 2
LED_BLUE = 3
LED_YELLOW = 4

def task1(timer):
pyb.LED(LED_BLUE).toggle()

return

def task2(timer):
pyb.LED(LED_GREEN).toggle()

return

Instead of calling the task code directly, we set up two timers time 1, and timer 2 with a frequency of 5 Hz (period of 200 milliseconds) and set up the callback function to call the tasks. The code to accomplish this is as follows:

pyb.LED(LED_BLUE).on()
pyb.LED(LED_GREEN).off()

# Create task timer for Blue LED
TimerBlueLed = pyb.Timer(1)
TimerBlueLed.init(freq=5)
TimerBlueLed.callback(task1)
print("Task 1 - Blue LED Toggle initialized ...")

# Create task timer for Green LED
TimerGreenLed = pyb.Timer(2)
TimerGreenLed.init(freq=5)
TimerGreenLed.callback(task2)
print("Task 2 - Green LED Toggle initialized ...")

The only code that's necessary for this example is the code for the main loop, which will do nothing more than print out how long our application has been running. To accomplish this, we need to sample the application start time using the time module's ticks_ms method and store it in TimeStart. We can then use time.ticks_diff to calculate the elapsed time between the current tick and the application start tick. The final piece of code is as follows:

TimeStart = time.ticks_ms()

while True:
time.sleep_ms(5000)
SecondsLive = time.ticks_diff(time.ticks_ms(), TimeStart) / 1000
print("Executing for ", SecondsLive, " seconds")

Once the code is on the pyboard and executing, the REPL should display the information shown in the following screenshot. It shows timer-based task scheduling, which prints the current execution time in the REPL and toggles between the blue and green LEDs at 5 Hz. At this point, you know how to use timers to schedule periodic tasks:

At this point, we are ready to examine some additional scheduling paradigms that are not completely mainstream within MicroPython, such as thread support.

MicroPython thread mechanism

The last scheduling paradigm that developers can use to schedule tasks is the MicroPython thread mechanism. In a microcontroller-based system, a thread is essentially a synonym for a task. There are some minor differences, but they are beyond the scope of this book. Developers can create threads that will contain task code. Each task could then use several different mechanisms to execute their task code, such as the following:

  • Waiting on a queue
  • Waiting on time using a delay
  • Periodically monitoring for a polled event

The thread mechanism has been implemented directly from Python 3.x and provides developers with an easy method for creating separate tasks in their application. It is important to recognize that the Python thread mechanism is NOT deterministic. This means that it will not be useful for developing software that has a hard real-time requirement. The MicroPython thread mechanism is also currently experimental! Threads are not supported in all MicroPython ports and for the ones that are, a developer usually needs to enable threads and recompile the kernel in order to have access to the capability on offer.

For additional information on threads and their behavior, please refer to the Further reading section at the end of this chapter.

Starting with MicroPython version 1.8.2, there is support for an experimental threads module that developers can use to create separate threads. Using the threads module is not recommended for developers who are just getting started with MicroPython for several reasons. First, by default, threading is not enabled in the MicroPython kernel. Developers need to enable threading and then recompile and deploy the kernel. Second, since the threading module is experimental, it has not been ported to every MicroPython port yet.

If threads aren't officially supported and not recommended, why are we even talking about them? Well, if we want to understand the different scheduling mechanisms available to us with MicroPython, we need to include the mechanisms that are even experimental. So, let's dive in and talk about threading with MicroPython (even though you may not be able to run a threading application until you have learned how to recompile the kernel, which you will do in Chapter 5, Customizing the MicroPython Kernel Start Up Code).

When a developer creates a thread, they are creating a semi-independent program. If you think back to what a typical program looks like, it starts with an initialization section and then enters into an infinite loop. Every thread has this structure! There is a section to initialize the thread and its variables, followed by an independent loop. The loop itself can be periodic by using time.sleep_ms() or it can block an event, such as an interrupt.

Advantages of using threads in MicroPython

From an organizational standpoint, threads can be a good choice for many MicroPython applications, although similar behavior can be achieved using the asyncio library (which we will talk about shortly). There are several advantages that threads provide, such as the following:

  • They allow a developer to easily break up their program into smaller constituents that can be assigned to individual developers.
  • They help us improve the code so that it's scalable and reusable.
  • They provide us with a small opportunity to decrease bugs in an application by breaking the application up into smaller, less complex pieces. However, as we mentioned previously, more bugs can be created by developers who are unfamiliar with how to use threads properly.

Considerations when using threads in MicroPython

For a Python programmer, before using threads in a MicroPython application, it makes a lot of sense to consider the potential consequences before immediately jumping to threads. There are a few important considerations that a developer needs to contemplate:

  • Threads are not deterministic. When a Python thread is ready to execute, there is no mechanism in place for one thread to be executed before another.
  • There is no real mechanism for controlling time slicing. Time slicing is when the CPU is shared between multiple threads that are currently ready to execute.
  • To pass data around the application, developers may need to add additional complexities to their design, such as the use of queues.
  • Developers who are not familiar with designing and implementing multi-threaded applications will find that inter-thread communication and syncing is full of pitfalls and traps. More time will be spent debugging and new developers will find that the other methods we've discussed are more appropriate for their applications.
  • Support for threading is currently experimental in MicroPython (see https://docs.micropython.org/en/latest/library/_thread.html).
  • Threads are not supported on all MicroPython ports, so the applications may be less portable than expected.
  • Threads will use more resources than the other techniques we've discussed in this chapter.

Building a task manager using threads

Despite a few drawbacks to using threads, they can be a very powerful tool for developers who understand how to use them in the context of a real-time embedded system. Let's take a look at how we can implement our railroad blinky LED application using threads. The first step to developing the application is to create our threads, just like how we created our tasks in the previous examples. In this case, though, there are several key modifications that are worth noting.

First, we need to import the threading module (_thread). Second, we need to define a thread as a regular function declaration. The difference here is that we treat each function like a separate application where we insert a while True statement. If the thread were to exit the infinite loop, the thread would cease operating and not use any more CPU time.

In this example, we're controlling the LED toggling time by using the time.sleep_ms function and setting our thread loop time to 150 milliseconds, just like we did in the previous examples. Our code now looks as follows:

import micropython # For emergency exception buffer
import pyb # For uPython MCU features
import time # For time features
import _thread # For thread support

micropython.alloc_emergency_exception_buf(100)

LED_RED = 1
LED_GREEN = 2
LED_BLUE = 3
LED_YELLOW = 4

def task1():
while True:
pyb.LED(LED_BLUE).toggle()
time.sleep_ms(15
0)

def task2():
while True:
pyb.LED(LED_GREEN).toggle()
time.sleep_ms(25
0)

We can initialize the system the exact same way that we did before by initializing the blue LED to on and the green LED to off. The difference in our thread application is that we want to write some code that will spawn off our two threads. This can be done with the following code:

pyb.LED(LED_BLUE).on()
pyb.LED(LED_GREEN).off()

_thread.start_new_thread(task1, ())
_thread.start_new_thread(task2, ())

As you can see, we're using the _thread.start_new_thread method here. This method requires two parameters. The first is the function that should be called when the thread is ready to run. In this case, these are our Led_BlueToggle and Led_YellowToggle functions. The second parameter is a tuple that needs to be passed to our threads. In this case, we have no parameters to pass, so we just pass an empty tuple.

Before running this code, it's useful to note that the rest of the script is the same as the code in our timer example. We create an infinite loop for the script and then report how long the application has been running for. As a reminder, the code for this is as follows:

TimeStart = time.ticks_ms()

while True:
time.sleep_ms(5000)
SecondsLive = time.ticks_diff(time.ticks_ms(), TimeStart) / 1000
print("Executing for ", SecondsLive, " seconds")

An interesting question to ask yourself as you run the threaded code is, How long will it take before these LEDs are no longer blinking in an alternating pattern? Since the threads are not deterministic, over time, there is the potential for these threads to get out of sync and for the application to no longer behave the way that we expect it to. If you are going to run the code, let it run for a while, over several hours, a day, or even a week, and observe the application's behavior.

Event-driven scheduling

Event-driven scheduling can be an extremely convenient technique for developers whose systems are driven by events that are happening on the system. For example, the system may need to respond to a user button press, an incoming data packet, or a limit switch being reached by an actuator.

In event-driven systems, there may be no need to have a periodic background timer; instead, the system can just respond to the event using interrupts. Event-driven scheduling may have our common infinite while loop, but that loop will do nothing or put the system into a low-power state until an event occurs. Developers who are using event-driven systems can follow the interrupt best practices that we discussed earlier and should also read the MicroPython documentation on ISR rules, which can be found at https://docs.micropython.org/en/latest/reference/isr_rules.html. It's important to note that when you do use interrupts, MicroPython automatically clears the interrupt flag for the developer so that using interrupts is simplified.

Cooperative scheduling

Cooperative scheduling is a technique that developers can leverage to achieve task periodicity without using a timer for every task. Cooperative schedulers are one of the most widely used schedulers throughout embedded system history. A quick look at any of the embedded.com embedded systems surveys will easily show that.

A cooperative scheduler often uses a single timer to create a system tick that the scheduler then uses to determine whether the task code should be executed. The cooperative scheduler provides a perfect balance for developers who need periodicity, simplicity, flexibility, and scalability. They are also a stepping stone toward an RTOS.

So far, we have examined the methods that developers can use in MicroPython to schedule activities. In the next section, we will discuss how we can use the asyncio library to cooperatively schedule tasks. This method is perhaps the most commonly used method by MicroPython developers due to its flexibility and precise timing beyond the methods that we have already examined.

 

Cooperative multitasking using asyncio

So far, we have examined how we can schedule tasks in a MicroPython-based system using round-robin, timers, and threads. While threads may be the most powerful scheduling option available, they aren't deterministic schedulers and don't fit the bill for most MicroPython applications. There is another scheduling algorithm that developers can leverage to schedule tasks within their systems: cooperative scheduling.

A cooperative scheduler, also known as cooperative multitasking, is basically a round-robin scheduling loop that includes several mechanisms to allow a task to yield the CPU to other tasks that may need to use it. The developer can fine-tune the way that their application behaves, and their tasks execute without adding the complexity that is required for a pre-emptive scheduler, like those included in an RTOS. Developers who decide that a cooperative scheduler fits their application best will need to make sure that each task they create can complete before any other task needs to execute, hence the name cooperative. The tasks cooperate to ensure that all the tasks are able to execute their code within their requirements but are not held to their timing by any mechanism.

Developers can develop their own cooperative schedulers, but MicroPython currently provides the asyncio library, which can be used to create cooperatively scheduled tasks and to handle asynchronous events in an efficient manner. In the rest of this chapter, we will examine asyncio and how we can use it for task scheduling within our embedded applications.

Introducing asyncio

The asyncio module was added to Python starting in version 3.4 and has been steadily evolving ever since. The purpose of asyncio is to handle asynchronous events that occur in Python applications, such as access to input/output devices, a network, or even a database. Rather than allowing a function to block the application, asyncio added the functionality for us to use coroutines that can yield the CPU while they wait for responses from asynchronous devices.

MicroPython has supported asyncio in the kernel since version 1.11 through the uasyncio library. Prior versions still supported asyncio, but the libraries had to be added manually. This could be done through several means, such as the following:

  • Copying the usyncio library to your application folder
  • Using micropip.py to download the usyncio library
  • Using upip if there is a network connection

If you are unsure whether your MicroPython port supports asyncio, all you need to do is type the following into the REPL:

import usyncio

If you receive an import error, then you know that you need to install the library before continuing. Peter Hinch has put together an excellent guide regarding asyncio with instructions for installing the library that you can find at https://github.com/peterhinch/micropython-async/blob/master/TUTORIAL.md#0-introduction.

It's important to note that the support for asyncio in MicroPython is for the features that were introduced in Python 3.4. Very few features from the Python 3.5 or above asyncio library have been ported to MicroPython, so if you happen to do more in-depth research into asyncio, please keep this in mind to avoid hours of debugging.

The main purpose of asyncio is to provide developers with a technique for handling asynchronous operations in an efficient manner that doesn't block the CPU. This is done through the use of coroutines, which are sometimes referred to as coros. A coroutine is a specialized version of a Python generator function that can suspend its execution before reaching a return and indirectly passes control to another coroutine. Coroutines are a technique that provides concurrency to a Python application. Concurrency basically means that we can have multiple functions that appear to be executing at the same time but are actually running one at a time in a cooperative manner. This is not parallel processing but cooperative multitasking, which can dramatically improve the scalability and performance of a Python application compared to other synchronous methods.

The general idea behind asyncio is that a developer creates several coroutines that will operate asynchronously with each other. Each coroutine is then called using a task from an event loop that schedules the tasks. This makes the coroutines and tasks nearly synonymous. The event loop will execute a task until it yields execution back to the event loop or to another coroutine. The coroutine may block waiting for an I/O operation or it may simply sleep if the coroutine wants to execute at a periodic interval. It's important to note, however, that if a coroutine is meant to be periodic, there may be jitter in the period, depending on the timing for the other tasks and when the event loop can schedule it to run again.

The general behavior for how coroutines work can be seen in the following diagram, which represents an overview of using coroutines with the asyncio library. This diagram is a modified version of the one presented by Matt Trentini at Pycon AU in 2019 during his talk on asyncio in MicroPython:

As shown in the preceding diagram, the Event Loop schedules a task to be executed that has 100% of the CPU until it reaches a yield point. A yield point is a point in the coroutine where a blocking operation (asynchronous operation) will occur and the coroutine is then willing to give up the CPU until the operation is completed. At this point, the event loop will then schedule other coroutines to run. When the asynchronous event occurs, a callback is used to notify the Event Loop that the event has occurred. The Event Loop will then mark the original coroutine as ready to run and will schedule it to resume when other coroutines have yielded the CPU. At that point, the coroutine can resume operation, but as we mentioned earlier, there could be some time that elapses between the receipt of the callback and the coroutine resuming execution, and this is by no means deterministic.

Now, let's examine how we can use asyncio to rewrite our blinky LED application using cooperative multitasking.

A cooperative multitasking blinky LED example

The first step in creating a railroad blinky LED example is to import the asyncio library. In MicroPython, there is not an asyncio library exactly, but a uasyncio library. To improve portability, many developers will import uasyncio as if it were the asyncio library by importing it at the top of their application, as follows:

import uasyncio as asyncio

Next, we can define our LEDs, just like we did in all our other examples, using the following code:

LED_RED = 1
LED_GREEN = 2
LED_BLUE = 3
LED_YELLOW = 4

If you look back at our example of writing a thread-based application, you'll recall that our task1 code looked as follows:

def task1():
while True:
pyb.LED(LED_BLUE).toggle()
time.sleep_ms(150)

def task2():
while True:
pyb.LED(LED_GREEN).toggle()
time.sleep_ms(150)

This is important to review because creating a coroutine will follow a similar structure! In fact, to tell the Python interpreter that our tasks are asynchronous coroutines, we need to add the async keyword before each of our task definitions, as shown in the following code:

async def task1():
while True:
pyb.LED(LED_BLUE).toggle()
time.sleep_ms(150)
async def task2():
while True:
pyb.LED(LED_GREEN).toggle()
time.sleep_ms(150)

The functions are now coroutines, but they are missing something very important: a yield point! If you examine each of our tasks, you can tell that we really want our coroutine to yield once we have toggled our LED and are going to wait 150 milliseconds. The problem with these functions as they are currently written is that they are making a blocking call to time.sleep_ms. We want to update this with a call to asyncio.sleep_ms and we want to let the interpreter know that we want to relinquish the CPU at this point. In order to do that, we are going to use the await keyword.

The await keyword, when reached by the coroutine, tells the event loop that it has reached a point in its execution where it will be waiting for an event to occur and it is willing to give up the CPU to another task. At this point, control is handed back to the event loop and the event loop can decide what task should be executed next. Using this syntax, our task code for the railroad blinky LED applications would be updated to the following:

async def task1():
while True:
pyb.LED(LED_BLUE).toggle()
await asyncio.sleep_ms(150)
async def task2():
while True:
pyb.LED(LED_GREEN).toggle()
await asyncio.sleep_ms(150)

For the most part, the general structure of our coroutine/task functions remains the same. The difference is that we define the function as async and then use await where we expect the asynchronous function call to be made.

At this point, we just initialize the LEDs using the following code:

pyb.LED(LED_BLUE).on()
pyb.LED(LED_GREEN).off()

Then, we create our event loop.

Creating the event loop for this application requires just four lines of code. The first line will assign the asyncio event loop to a loop variable. The next two lines create tasks that assign our coroutines to the event loop. Finally, we tell the event loop to run forever and our coroutines to execute. These four lines of code look as follows:

loop = asyncio.get_event_loop()
loop.create_task(task1())
loop.create_task(task2())
loop.run_forever()

As you can see, we can create any number of tasks and pass the desired coroutine to the create_task method in order to get them into the event loop. At this point, you could run this example and see that you have an efficiently running railroad blinky LED program that uses cooperative multitasking.

Going further with asyncio

Unfortunately, there just isn't enough time to discuss all the cool capabilities that are offered by asyncio in MicroPython applications. However, as we progress through this book, we will use asyncio and its additional capabilities as we develop our various projects. For those of you who want to dig deeper right now, I would highly recommend checking out Peter Hinch's asyncio tutorial, which also covers how you can coordinate tasks, use queues, and more, with asyncio. You can find the tutorial and some example code at https://github.com/peterhinch/micropython-async/blob/master/TUTORIAL.md#0-introduction.

 

Summary

In this chapter, we explored several different types of real-time scheduling techniques that can be used with a MicroPython project. We found that there are many different techniques that a MicroPython developer can leverage to schedule activities in their application. We found that each of these techniques has its place and varies based on the level of complexity a developer wants to include in their scheduler. For example, MicroPython threads can be used, but they are not fully supported in every MicroPython port and should be considered an in-development feature.

After looking at several techniques, we saw that the asyncio library may be the best choice for developers looking to get started with MicroPython. Python developers are already familiar with it and asyncio provides developers with cooperative scheduling capabilities that can provide them with the ability to handle asynchronous events in an efficient, non-blocking manner. This allows developers to get more out of their applications while wasting fewer cycles.

In the next chapter, we will explore how we can write drivers for a simple application that uses a push button to control the state of its RGB LEDs.

 

Questions

  1. What characteristics define a real-time embedded system?
  2. What four scheduling algorithms are commonly used with MicroPython?
  3. What best practices should a developer follow when using callbacks in MicroPython?
  4. What process should be followed to load new code onto a MicroPython board?
  5. Why would a developer place micropython.alloc_emergency_exception_buf(100) in their application?
  6. What reasons might deter a developer from using the _thread library?
  7. What keywords indicate that a function is being defined as a coroutine?
 

About the Author

  • Jacob Beningo

    Jacob Beningo is an independent consultant who specializes in microcontroller-based embedded systems. He has advised, coached, and developed systems across multiple industries, including the automotive, defense, industrial, medical, and space sectors. Jacob enjoys working with companies to help them develop and improve their processes and skill sets. He publishes a monthly newsletter, Embedded Bytes, and blogs for publications about embedded system design techniques and challenges. Jacob holds bachelor's degrees in electrical engineering, physics, and mathematics from Central Michigan University and a master's degree in space systems engineering from the University of Michigan.

    Browse publications by this author

Latest Reviews

(3 reviews total)
pas de description précise du hardware!
Um excelente preço, por um livro que me parece bem escrito e com informação útil.
Excellent introduction to MicroPython and embedded systems.

Recommended For You

Book Title
Unlock this book and the full library for only $5/m
Access now