icon

UNIHIKER: ADS-B Flight Tracker

0 1253 Medium

This tutorial will guide you through creating an ADS-B (Automatic Dependent Surveillance-Broadcast) flight tracker using Python. ADS-B is a surveillance technology in which an aircraft determines its position via satellite navigation and periodically broadcasts it, enabling it to be tracked. In this tutorial, you will create a Python application that receives ADS-B data, processes it, and displays it in a graphical interface. The project involves setting up the necessary hardware, installing required software on a Linux system, Python coding and running a Python application that displays aircraft data on a GUI.

HARDWARE LIST
1 UNIHIKER
1 RTL-SDR Dongle

Next to Wi-Fi or Ethernet connection for internet access (to install required Linux packages).

 

You need for this project following:

 

UNIHIKER device: The main controller for this project

 

RTL-SDR USB dongle and Antenna (Software-Defined Radio): To receive ADS-B signals

 

 

STEP 1
Preparation (Install packages)

In this step 2 very important packages are installed. To do this, you have to start UNIHIKER and connect to the device using SSH. It is very important for this step that UNIHIKER has an internet connection!

CODE
# update package list
$ apt update

# install required packages
$ apt install -y rtl-sdr dump1090-mutability

# install optional package
$ apt install -y ncat
STEP 2
First Test (to verify)

This step checks whether the system is receiving signals. Internet connection is no longer required. Make sure the RTL-SDR dongle (via USB) is connected to the UNIHIKER device. Optional, use netcat to check the raw data output from dump1090-mutability.

CODE
# SSH Connection (tty000)

# execute dump1090-mutability
$ dump1090-mutability --interactive --net

# SSH Connection (tty001)

# execute ncat
$ nc localhost 30003

If everything works, you should see the following output in both terminals:

 

tty000

 

 

The output shows all aircraft that are nearby.

 

tty001

 

 

The output shows the dump1090-mutability messages on the port 30003.

 

Note: I live near Zurich Airport, so there are a lot of signals. If you don't receive anything - change your location!

STEP 3
Application Structure and Python Code

In this step, the project structure (folders & files) as well as all the code are created on your local system. When you're finished, the result should look like this:

 

 

Project structure:

 

Create a directory named “ADS-B” and inside a directory “lib”. Inside the directory “lib” create two files: “flight_data.py” and “gui.py”. In the project directory create a file named: “main.py”.

 

flight_data.py: This file is responsible for receiving and processing the raw ADS-B flight data.

 

gui.py: This file defines the graphical user interface (GUI) of the application using the Tkinter library.

 

main.py: This file serves as the entry point of the application.

 

flight_data.py

 

Connecting to a data source that streams ADS-B messages. Parsing the received messages to extract relevant flight information. Updating the internal flight data dictionary with the parsed information. Providing the updated flight data for use by other parts of the application.
CODE
from socket import socket, AF_INET, SOCK_STREAM
from re import match
from queue import Queue
from time import sleep

flight_data = {}


def should_remove(key: str) -> bool:
    """
    Check if a key should be removed.

    :param key: The key to check.
    :type key: str
    :return: True if the key should be removed, False otherwise.
    """
    if len(key) == 1:
        return True

    if match(r'^\d{4}/\d{2}/\d{2}$', key):
        return True

    return False


def update_flight_data(icao: str, flight: str) -> list:
    """
    Update flight data for the given ICAO code.

    :param icao: The ICAO code of the flight.
    :type icao: str
    :param flight: The flight information to update.
    :type flight: str
    :return: A list of tuples containing the ICAO code and flight information for all flights that should not be removed.
    """
    global flight_data

    if icao in flight_data:
        if not flight:
            flight = flight_data[icao]['Flight']
    else:
        flight_data[icao] = {'Flight': ''}

    flight_data[icao] = {
        'Flight': flight
    }

    return  [(key, value['Flight']) for key, value in flight_data.items() if not should_remove(key)]

def process_message(message: str, data_queue: Queue) -> None:
    """
    Process the given message and update the flight data.

    :param message: A string containing the message to process.
    :type message: str
    :param data_queue: A Queue object to store the updated flight data.
    :type data_queue: Queue
    :return: None
    """
    parts = message.split(',')

    if len(parts) >= 20:
        hex_code = parts[4]
        flight_code = parts[10].strip()

        updated_data = update_flight_data(icao=hex_code, flight=flight_code)
        data_queue.put(updated_data)

def receive_flight_data(host: str, port: int, delay: float, data_queue: Queue) -> None:
    """
    Receives flight data from a specified host and port, and processes the data by placing it into a specified Queue.

    :param host: The hostname or IP address to connect to.
    :type host: str
    :param port: The port number to connect to.
    :type port: int
    :param delay: The time delay (in seconds) to wait between receiving data.
    :type delay: float
    :param data_queue: The Queue object to store the received flight data.
    :type data_queue: Queue
    :return: None
    """
    with socket(AF_INET, SOCK_STREAM) as s:
        s.connect((str(host), int(port)))

        while True:
            data = s.recv(4096)

            if not data:
                break

            try:
                messages = data.decode('utf-8').strip().split('\n')
                for msg in messages:
                    process_message(msg, data_queue)
            except UnicodeDecodeError:
                print('Error')

            sleep(float(delay))

gui.py

 

A main application window (TrackerApp) that sets up the layout and widgets. Methods to dynamically update the GUI with new flight data. A scrollable canvas to accommodate the display of multiple flight data entries.
CODE
from tkinter import Tk, Frame, Label, Canvas, Scrollbar


class TrackerApp(Tk):

    __HEADER: tuple = ('HEX', 'Flight')

    def __init__(self, width: int, height: int):
        """
        Class constructor.

        :param width: screen width
        :type width: int
        :param height: screen height
        :type height: int
        """
        super().__init__()
        self._screen_width = int(width)
        self._screen_height = int(height)

        self.resizable(width=False, height=False)
        self.geometry(f'{self._screen_width}x{self._screen_height}+0+0')

        self.__add_header()

        self.canvas = Canvas(self)
        self.scroll_y = Scrollbar(self, orient='vertical', command=self.canvas.yview)
        self.canvas.config(yscrollcommand=self.scroll_y.set)

        self.canvas_frame = Frame(self.canvas)
        self.canvas.create_window((0, 0), window=self.canvas_frame, anchor='nw')

        self.canvas_frame.bind('<Configure>', lambda e: self.__on_frame_configure(self.canvas))

        self.scroll_y.pack(side='right', fill='y')
        self.canvas.pack(side='left', fill='both', expand=True)

        self.labels = {}

    @staticmethod
    def __on_frame_configure(canvas: Canvas) -> None:
        """
        Static method updates the scroll region of a canvas when size of frame is changed.

        :param canvas: The canvas widget whose scroll region needs to be updated.
        :type canvas: Canvas
        :return: None
        """
        canvas.config(scrollregion=canvas.bbox('all'))

    def __add_header(self) -> None:
        """
        Private method to add a header frame to the top of the window.

        :return: None
        """
        header_frame = Frame(self)
        header_frame.pack(fill='x', side='top')

        header_frame.grid_columnconfigure(index=0, weight=1)
        header_frame.grid_columnconfigure(index=1, weight=1)

        for number in range(2):
            text = self.__HEADER[number]
            header_label = Label(header_frame, text=text, borderwidth=1, relief='solid', height=2)
            header_label.grid(row=0, column=number, sticky='nsew')

        header_frame.grid_rowconfigure(index=0, weight=1)

    def update_flight_data(self, flight_data: list) -> None:
        """
        Update the flight data displayed on the canvas.

        :param flight_data: A list of lists containing the flight data to update.
        :return: None
        """
        for i, (hex_code, flight_code) in enumerate(flight_data):
            for j, text in enumerate((hex_code, flight_code)):
                if (i, j) in self.labels:
                    self.labels[(i, j)].config(text=text)
                else:
                    label = Label(self.canvas_frame, text=text, borderwidth=1, relief='solid', width=12, height=2)
                    label.grid(row=i, column=j, sticky='nsew')
                    self.labels[(i, j)] = label

        current_labels = set((i, j) for i in range(len(flight_data)) for j in range(2))

        for pos in list(self.labels):
            if pos not in current_labels:
                self.labels[pos].destroy()
                del self.labels[pos]

        self.canvas_frame.update_idletasks()
        self.__on_frame_configure(self.canvas)

main.py

 

Initializes the GUI (TrackerApp). Sets up a separate thread to receive and process flight data continuously. Periodically updates the GUI with the latest flight data from the data queue. Starts the Tkinter main event loop to keep the application running.
CODE
from threading import Thread
from queue import Queue
from lib.gui import TrackerApp
from lib.flight_data import receive_flight_data

SCREEN_WIDTH: int = 240
SCREEN_HEIGHT: int = 320
HOST: str = '127.0.0.1'
PORT: int = 30003
DELAY: float = 2.5

def flight_data_thread(data_queue: Queue) -> None:
    """
    Receive flight data in a separate thread.

    :param data_queue: A queue to store the received flight data.
    :return: None
    """
    receive_flight_data(HOST, PORT, DELAY, data_queue)

def main() -> None:
    """
    This function is the entry point of the program.

    :return: None
    """
    app = TrackerApp(width=SCREEN_WIDTH, height=SCREEN_HEIGHT)

    flight_data_queue = Queue()

    thread = Thread(target=flight_data_thread, args=(flight_data_queue,), daemon=True)
    thread.start()

    def update_gui():
        """
        Update the GUI with flight data from the flight_data_queue.

        :return: None
        """
        while not flight_data_queue.empty():
            flight_data = flight_data_queue.get()
            app.update_flight_data(flight_data)
        app.after(100, update_gui)

    app.after(100, update_gui)

    app.mainloop()


if __name__ == '__main__':
    main()
STEP 4
Upload to UNIHIKER Device (SCP/SMB)

To transfer the files to your UNIHIKER device, you can use SCP (Secure Copy) or SMB (Samba). Read the following documentations: SSH and/or SMB and decide for yourself which one you prefer. Here the example for SCP:

 

$ scp -r ADS-B [email protected]:/root/

STEP 5
Running the Application

Once the directories and files are on your UNIHIKER device, you can run the application and track the aircraft's around. As in the first test (Step 2), I will show you how to do it with two SSH connections.

CODE
# SSH Connection (tty000)

# execute dump1090-mutability
$ dump1090-mutability --interactive --net

# SSH Connection (tty001)

# change directory
$ cd /root/ADS-B/

# run Python application
$ python3 main.py

It is important that dump1090 is started for the application! It may take some time until all data is displayed! Here's an example:

 

STEP 6
Annotations

What Can Be Improved:

 

Error Handling: Add comprehensive error handling to manage network issues and data processing errors.

 

Enhanced GUI: Improve the GUI to display more detailed flight information such as altitude, speed, and position.

 

Performance Optimization: Optimize data processing and GUI updates to handle large volumes of flight data efficiently.

 

 

More Ideas:

 

Historical Data Storage: Implement a feature to store historical flight data in a database.

 

Data Visualization: Create additional visualizations, such as graphs and maps, to display flight paths and statistics.

 

Alerts and Notifications: Add functionality to alert users about specific flights or unusual flight patterns.

License
All Rights
Reserved
licensBg
0