Capturing a video stream from a camera in python can be done with OpenCV. However, when doing this operation on the main thread, performance won’t be great, especially when capturing in HD quality. In this blog post, a solution is shown by running the video capture operation in a separate (green) thread. The performance increases dramatically as shown below (on a MacBook Pro) :

For 640×480:

[i] Frames per second: 28.71, async=False
[i] Frames per second: 81.67, async=True

For 1280×720

[i] Frames per second: 15.02, async=False
[i] Frames per second: 52.04, async=True

First, write a VideoCapture class. The class needs:

  • an __init__ function that opens the video capture stream, sets the frame dimensions and creates a lock object for thread save assigning and copying of the frames.
  • a start function to create and start the thread
  • an update function that will be called asynchronously.
  • a read function that we will call from our code to retrieve a new frame.
  • a stop function to stop (join) the thread
  • an __exit__ function to clean up some resources.

The beautiful part of this class is that it enables you to update existing code with minimal change. You only have to change the line containing

cap = cv2.VideoCapture()

to

cap = VideoCaptureAsync()

and add

cap.start()

and

cap.stop()

at the beginning and end of the capture read loop. That’s it, very easy.

Here is the code for the VideoCaptureAsync class:

# file: videocaptureasync.py
import threading
import cv2

class VideoCaptureAsync:
    def __init__(self, src=0, width=640, height=480):
        self.src = src
        self.cap = cv2.VideoCapture(self.src)
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
        self.grabbed, self.frame = self.cap.read()
        self.started = False
        self.read_lock = threading.Lock()

    def set(self, var1, var2):
        self.cap.set(var1, var2)

    def start(self):
        if self.started:
            print('[!] Asynchroneous video capturing has already been started.')
            return None
        self.started = True
        self.thread = threading.Thread(target=self.update, args=())
        self.thread.start()
        return self

    def update(self):
        while self.started:
            grabbed, frame = self.cap.read()
            with self.read_lock:
                self.grabbed = grabbed
                self.frame = frame

    def read(self):
        with self.read_lock:
            frame = self.frame.copy()
            grabbed = self.grabbed
        return grabbed, frame

    def stop(self):
        self.started = False
        self.thread.join()

    def __exit__(self, exec_type, exc_value, traceback):
        self.cap.release()

And here is a small program to test the video capture code:

# file: test.py
import cv2
import time
from videocaptureasync import VideoCaptureAsync

def test(n_frames=500, width=1280, height=720, async=False):
    if async:
        cap = VideoCaptureAsync(0)
    else:
        cap = cv2.VideoCapture(0)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    if async:
        cap.start()
    t0 = time.time()
    i = 0
    while i < n_frames:
        _, frame = cap.read()
        cv2.imshow('Frame', frame)
        cv2.waitKey(1) & 0xFF
        i += 1
    print('[i] Frames per second: {:.2f}, async={}'.format(n_frames / (time.time() - t0), async))
    if async:
        cap.stop()
    cv2.destroyAllWindows()


if __name__ == '__main__':
    test(n_frames=500, width=1280, height=720, async=False)
    test(n_frames=500, width=1280, height=720, async=True)

 

 

Github repository