Intelligent Aquarium Ecosystem Monitor Based on YOLOv8 Keypoints Detection and Tracking
1.Project Introduction
1.1 Project Overview
Do you have an aquarium at home? Do you keep a lot of goldfish? But would you like to monitor your precious goldfish in real time? We have created an aquarium ecosystem monitor using the UNIHIKER K10! It implements a fish detection system through the YOLOv8 Keypoints Detection model, and combines the Mediapipe library to recognize key points of fish (such as heads, bodies, and tails) in real time. This system can accurately identify the quantity, species, posture, movement trajectory, and abnormal gathering of fish schools; moreover, it will issue an alarm in a timely manner when fish schools gather abnormally.

The computer communicates with the UNIHIKER K10 board via siot. Facing the K10 camera, when fish are detected, the computer will identify the number, species, posture and movement trajectory of the fish and display them on the screen. In addition, UNIHIKER K10 responds accordingly based on the abnormal conditions of the fish, offering a highly immersive experience of artificial intelligence!
1.2 Project Functional Diagrams

1.3 Project Video
2.Materials List
2.1 Hardware list

2.2 Software
Mind+ Graphical Programming Software (Minimum Version Requirement: V1.8.1 RC1.0)

2.3 Basic Mind+ Software Usage
(1) Double click to open the Mind
The following screen will be called up.

Click and switch to offline mode.

(2) Load UNIHIKER K10
Based on the previous steps, then click on "Extensions" find the "UNIHIKER K10" module under the "Board" and click to add it. After clicking "Back" you can find the UNIHIKER K10 in the "Command Area" and complete the loading of UNIHIKER K10.

Then, you need to use a USB cable to connect the UNIHIKER K10 to the computer.

Then, after clicking Connect Device, click COM7-UNIHIKER K10 to connect.

Note: The device name of different UNIHIKER K10 may vary, but all end with K10.
In Windows 10/11, the UNIHIKER K10 is driver-free. However, for Windows 7, manual driver installation is required: https://www.unihiker.com/wiki/K10/faq/#high-frequency-problem.
The next interface you see is the Mind+ programming interface. Let's see what this interface consists of.

Note: For a detailed description of each area of the Mind+ interface, see the Knowledge Hub section of this lesson.
3.Construction Steps
The project is divided into three main parts:
(1) Task 1: UNIHIKER K10 Networking and Webcam Activation
Connect UNIHIKER K10 through IoT communication and enable the webcam function to establish a visual communication channel with the computer for transmitting video data.
(2) Task 2: Visual Detection and Data Upload
The video information containing fish captured by the K10 camera on UNIHIKER K10 is transmitted to the computer. The real-time video stream is processed through the computer's yolo model and the MediaPipe library. Meanwhile, connect the UNIHIKER K10 to the MQTT platform and upload the test results to the SIoT platform via the computer.
(3) Task 3: UNIHIKER K10 Receiving Results and Executing Control
UNIHIKER K10 remotely retrieves inference results from SIoT. It implements different functions according to aggregation of fish schools.
3.1 Task1: UNIHIKER K10 Networking and Webcam Activation
(1) Hardware Setup
Confirm that the UNIHIKER K10 is connected to the computer via a USB cable.
(2) Software Preparation
Make sure that Mind+ is opened and the UNIHIKER board has been successfully loaded. Once confirmed, you can proceed to write the project program.

(3)Write the Program
UNIHIKER K10 Network Connection and Open Webcam
To enable communication between the computer and UNIHIKER K10, first ensure both devices are connected to the same local network.

First,add MQTT communication and Wi-Fi modules from the extension library. Refer to the diagram for commands.

After UNIHIKER K10 is connected to the network, its camera and networking functions are required to transmit camera information to the local area network (LAN), allowing access from any computer on the LAN at any time. This enables the computer to call many existing computer vision libraries for image recognition. Therefore, next we need to load the webcam library to ensure that the information captured by UNIHIKER K10 can be transmitted to the computer.
Click on "Extensions" in the Mind+ toolbar, enter the "User Ext" category. Input: https://gitee.com/yeezb/k10web-cam in the search bar, and click to load the K10 webcam extension library.
User library link: https://gitee.com/yeezb/k10web-cam

We need to use the "Wi-Fi connect to account (SSID, Password)"command in the network communication extension library to configure Wi-Fi for the UNIHIKER K10 terminal. Please ensure that the Wi-Fi connected to UNIHIKER K10 is the same as that of your computer. At the same time, we also need to use the "Webcam On" function to enable the webcam feature, so that the information captured by the UNIHIKER K10 can be transmitted to the computer.

Once the network connection is successfully established, the "Webamera On" block can be used to transmit the information captured by UNIHIKER K10 to the computer.

Then, we need to determine the IP address of the UNIHIKER K10 in the serial monitor. Use a loop to execute the command five times to display the IP address five times. Then, drag the serial content output block, select string output and enable line wrapping. Finally, obtain the WiFi configuration, select the acquired IP address, and display it once every second.

Click Upload button,when the burning progress reaches 100%, it indicates that the program has been successfully uploaded.

Open the serial port monitor, and you can see the IP of UNIHIKER K10.
When you open IP/stream in your browser, you can view the camera screen. For example, in the IP shown in the above picture, you can enter 192.168.11.72/stream in your browser.

3.2 Task2: Vision Detection and Data Upload
Next,UNIHIKER K10 needs to be connected to the MQTT platform. We will create a SIoT theme to store the results and use a computer to complete the key point detection of fish. Based on the keypoints, we will analyze the amount,species, movements and trajectories of the fish. The test results will then be sent to SIoT for further processing on UNIHIKER K10.
(1) Prepare the Computer Environment
First,on our computer, we need to download the Windows version of SIoT_V2, extract it, and double-click start SloT.bat to start SIoT. After starting, a black window will pop up to initialize the server.Critical note: DO NOT close this window during operation, as it will terminate the SIoT service immediately.

Note: For details on downloading SIoT_V2, please refer to: https://drive.google.com/file/d/1qVhyUmvdmpD2AYl-2Cijl-2xeJgduJLL/view?usp=drive_link
After starting SIoT.bat on the computer, initialize the parameters for MQTT in UNIHIKER K10: set the IP address as the local computer's IP, the username as SIoT, and the password as dfrobot.

We need to install the Python dependency libraries required for this project to achieve the identification and processing of key information about fish. Open a new Mind+ window,navigate to the Mode Switch section and select "Python Mode".

In Python Mode, click Code in the toolbar.Navigate to Library Management and the Library Installation page will open for dependency management.

Click "PIP Mode" and run the following commands in sequence to install six libraries including the mediapipe library and the ultralytics library.
pip insatll mediapipe
pip install ultralytics
numpy
request
opencv-python
opencv-contrib-python


(2) Write the Program
STEP One: Create Topics
Access "Your Computer IP Address:8080", such as"192.168.11.41:8080",in a web browser on your computer.

Enter the username 'SIoT' and password 'dfrobot' to log in to the SIoT IoT platform.

After logging into the SIoT platform, navigate to the Topic section and create a topic: 'emo' (Used for storing control instructions of the UNIHIKER K10 display board ) .Refer to the operations shown in the image below.

Next, let's write the code for fish detection. The code includes the main functional modules: analyzing the behavior of fish schools based on tracking key parts.
Follow these steps to create a new Python file named "fish.py" in Mind+:In the 'Files in Project' directory of the right sidebar in Mind+, create a new py file named "fish".

STEP Two: Code for Detecting Keypoints of Fish
We use the YOLO model to detect the keypoints of the fish and send instructions to the 'fish' topic on the SIoT platform based on the swimming trajectory and aggregation degree of the fish in the image.
import cv2
import numpy as np
import torch
from ultralytics import YOLO
import math
from collections import deque, defaultdict
import time
import sys
import requests
import siot # Import SIoT library
class FishAnalyzer:
def __init__(self):
"""
Initialize fish analyzer with a single YOLO model for pose estimation and classification
"""
# Initialize SIoT connection parameters
self.siot_client_id = "32867041742986824"
self.siot_server = "127.0.0.1"
self.siot_port = 1883
self.siot_user = "siot"
self.siot_password = "dfrobot"
# Establish SIoT connection
siot.init(client_id=self.siot_client_id,
server=self.siot_server,
port=self.siot_port,
user=self.siot_user,
password=self.siot_password)
siot.connect()
siot.loop()
# Track previous cluster state
self.last_cluster_state = False
# Load YOLO model (supports both pose estimation and classification)
self.model = YOLO('yolo.pt')
# Fish keypoint index mapping
self.keypoint_mapping = {
'head': 0, # Fish head
'body_center': 5, # Body center
'tail': 11, # Tail
'dorsal_fin': 6, # Dorsal fin
'caudal_fin_left': 13, # Left caudal fin
'caudal_fin_right': 14 # Right caudal fin
}
# Fish species mapping (based on model output)
self.fish_species = {
0: "Goldfish",
1: "Koi",
2: "Betta",
3: "Other" # Default category
}
# Tracking parameters
self.tracked_fish = {} # {id: {'trail': deque, 'last_seen': frame_count, 'species': species_id}}
self.fish_id_counter = 0 # Fish ID counter
self.seen_ids = set() # Track all unique IDs
self.confidence_threshold = 0.3 # Keypoint confidence threshold
self.classification_confidence = 0.5 # Classification confidence threshold
# Trail length and colors
self.trail_length = 20 # Maximum trail length
self.colors = self.generate_distinct_colors(50) # Generate 50 distinct colors
# Species counters
self.species_count = defaultdict(int) # Current frame species count
self.total_species_count = defaultdict(int) # Historical species count
# Fish cluster alarm parameters
self.cluster_threshold = 150 # Fish cluster distance threshold (pixels)
self.min_fish_for_cluster = 3 # Minimum fish count to trigger alarm
self.alarm_active = False # Alarm status (active/inactive)
self.alarm_counter = 0 # Alarm counter (to avoid false alarms)
self.alarm_duration = 30 # Alarm duration (frames)
self.alarm_history = deque(maxlen=100) # Alarm history (max 100 entries)
def generate_distinct_colors(self, n):
"""Generate a set of distinct colors"""
colors = []
for i in range(n):
hue = i * 360 / n # Evenly distribute hue values
hsv_color = np.array([[[hue, 1.0, 1.0]]], dtype=np.uint8)
bgr_color = cv2.cvtColor(hsv_color, cv2.COLOR_HSV2BGR)[0][0] # Convert to BGR format (OpenCV default)
colors.append(tuple(int(c) for c in bgr_color))
return colors
def calculate_vector_angle(self, p1, p2, reference='x-axis'):
"""Calculate angle of vector formed by two points with reference to axis (degrees)"""
if p1 is None or p2 is None:
return None
# Calculate vector components
dx = p2[0] - p1[0]
dy = p2[1] - p1[1]
# Avoid zero vector (cannot calculate angle)
if dx == 0 and dy == 0:
return None
# Calculate angle with x-axis
angle_rad = math.atan2(dy, dx) # In radians
angle_deg = math.degrees(angle_rad) # Convert to degrees
# Convert to 0-360 degree range
if angle_deg < 0:
angle_deg += 360
return angle_deg
def calculate_three_point_angle(self, p1, p2, p3):
"""Calculate angle formed by three points (p2 as vertex)"""
if p1 is None or p2 is None or p3 is None:
return None
# Calculate vectors
v1 = [p1[0] - p2[0], p1[1] - p2[1]] # Vector from p2 to p1
v2 = [p3[0] - p2[0], p3[1] - p2[1]] # Vector from p2 to p3
# Calculate dot product and vector magnitudes
dot_product = v1[0] * v2[0] + v1[1] * v2[1]
len_v1 = math.sqrt(v1[0]**2 + v1[1]** 2)
len_v2 = math.sqrt(v2[0]**2 + v2[1]** 2)
if len_v1 == 0 or len_v2 == 0:
return None
# Calculate angle (clamp cosine to avoid numerical overflow)
cos_theta = dot_product / (len_v1 * len_v2)
cos_theta = max(min(cos_theta, 1.0), -1.0)
angle_rad = math.acos(cos_theta)
return math.degrees(angle_rad)
def update_tracking(self, detections, current_frame):
"""Update fish tracking information (ID assignment and trail maintenance)"""
current_ids = set() # IDs present in current frame
# Remove fish not seen for a while (delete after 30 frames)
to_remove = []
for fish_id, fish_data in self.tracked_fish.items():
if current_frame - fish_data['last_seen'] > 30:
to_remove.append(fish_id)
for fish_id in to_remove:
del self.tracked_fish[fish_id]
# Assign IDs to each detected fish (nearest neighbor matching)
for detection in detections:
center = detection['center'] # Current fish center coordinates
min_distance = float('inf') # Minimum distance for matching
best_match = None # Best matching tracked ID
# Find nearest tracked fish
for fish_id, fish_data in self.tracked_fish.items():
last_center = fish_data['trail'][-1] if fish_data['trail'] else None
if last_center:
# Calculate Euclidean distance
distance = math.hypot(center[0] - last_center[0], center[1] - last_center[1])
# Consider as same fish if distance below threshold
if distance < min_distance and distance < 100:
min_distance = distance
best_match = fish_id
# Update existing track or create new track
if best_match is not None:
# Match found: update trail and last seen time
fish_id = best_match
self.tracked_fish[fish_id]['last_seen'] = current_frame
self.tracked_fish[fish_id]['trail'].append(center)
# Remove oldest point if exceeding trail length
if len(self.tracked_fish[fish_id]['trail']) > self.trail_length:
self.tracked_fish[fish_id]['trail'].popleft()
detection['id'] = fish_id
else:
# New fish: assign new ID and initialize tracking info
fish_id = self.fish_id_counter
self.tracked_fish[fish_id] = {
'trail': deque([center], maxlen=self.trail_length), # Initialize trail
'last_seen': current_frame,
'species': detection.get('species', 3) # Default to "Other"
}
detection['id'] = fish_id
self.fish_id_counter += 1
# Record current ID and all seen IDs
current_ids.add(fish_id)
self.seen_ids.add(fish_id)
return current_ids
def detect_clusters(self, centers):
"""Detect if fish are clustering together"""
# Don't detect clusters if insufficient fish
if len(centers) < self.min_fish_for_cluster:
return False, None, None
# Calculate average distance between all fish
total_distance = 0
count = 0
for i in range(len(centers)):
for j in range(i+1, len(centers)):
dist = math.hypot(centers[i][0] - centers[j][0], centers[i][1] - centers[j][1])
total_distance += dist
count += 1
if count == 0:
return False, None, None
avg_distance = total_distance / count
# Check for clustering (average distance below threshold)
cluster_found = False
cluster_center = None # Cluster center coordinates
cluster_radius = 0 # Cluster radius (distance to farthest fish)
if avg_distance < self.cluster_threshold:
cluster_found = True
# Calculate cluster center (average of all fish centers)
cluster_x = sum(c[0] for c in centers) / len(centers)
cluster_y = sum(c[1] for c in centers) / len(centers)
cluster_center = (cluster_x, cluster_y)
# Calculate cluster radius
max_dist = 0
for c in centers:
dist = math.hypot(c[0] - cluster_x, c[1] - cluster_y)
if dist > max_dist:
max_dist = dist
cluster_radius = max_dist
return cluster_found, cluster_center, cluster_radius
def send_cluster_alarm(self, cluster_found):
"""Send cluster alarm status to SIoT when state changes"""
# Only send command when cluster state changes
if cluster_found != self.last_cluster_state:
if cluster_found:
print("[Fish Cluster] Sending 'a' to siot/fish")
siot.publish(topic="siot/fish", data="a") # Send cluster alert
# Update state record
self.last_cluster_state = cluster_found
def analyze_fish(self, frame, frame_count):
"""Analyze fish in frame, return pose, species and clustering information"""
# Use YOLO model for prediction
results = self.model.predict(
frame,
conf=0.5, # Object detection confidence threshold
verbose=False # Disable detailed logging
)
fish_analysis = [] # Store analysis results for each fish
detections = [] # Detection info for tracking
centers = [] # Center coordinates for cluster detection
# Reset current frame species counter
self.species_count = defaultdict(int)
for result in results:
# Iterate through each detected fish (binding boxes and keypoints)
for box, keypoints in zip(result.boxes, result.keypoints):
# Filter low confidence detections
if box.conf.item() < 0.5:
continue
# Get bounding box coordinates (x1,y1: top-left; x2,y2: bottom-right)
x1, y1, x2, y2 = box.xyxy[0].numpy()
center = ((x1 + x2) / 2, (y1 + y2) / 2) # Calculate center coordinates
centers.append(center) # Add to cluster detection list
# Get classification information
cls_id = box.cls.item()
species_id = int(cls_id)
species_conf = box.conf.item()
species_name = self.fish_species.get(species_id, "Unknown")
# Update species counters
self.species_count[species_id] += 1
if species_id not in self.total_species_count:
self.total_species_count[species_id] = 0
# Extract keypoint coordinates and confidence scores
kpts = keypoints.xy[0].numpy() # Keypoint coordinates
confs = keypoints.conf[0].numpy() if keypoints.conf is not None else [1.0] * len(kpts)
# Filter low confidence keypoints
points = {}
for name, idx in self.keypoint_mapping.items():
if idx < len(kpts) and confs[idx] > self.confidence_threshold:
points[name] = (kpts[idx][0], kpts[idx][1])
else:
points[name] = None
# If body center missing, try calculating from shoulder keypoints
if points.get('body_center') is None:
if confs[5] > self.confidence_threshold and confs[6] > self.confidence_threshold:
left_shoulder = kpts[5]
right_shoulder = kpts[6]
body_center_x = (left_shoulder[0] + right_shoulder[0]) / 2
body_center_y = (left_shoulder[1] + right_shoulder[1]) / 2
points['body_center'] = (body_center_x, body_center_y)
# Calculate various angle features
analysis = {
# Swimming direction (vector from head to tail)
'swim_direction': self.calculate_vector_angle(
points.get('head'), points.get('tail')),
# Body tilt angle (vector from head to body center)
'body_tilt': self.calculate_vector_angle(
points.get('head'), points.get('body_center')),
# Caudal fin angle (angle at tail between left and right caudal fins)
'caudal_fin_angle': self.calculate_three_point_angle(
points.get('caudal_fin_left'),
points.get('tail'),
points.get('caudal_fin_right')),
# Dorsal fin angle (vector from body center to dorsal fin)
'dorsal_fin_angle': self.calculate_vector_angle(
points.get('body_center'), points.get('dorsal_fin'))
}
# Consolidate all fish information
fish_data = {
'points': points, # Keypoint coordinates
'angles': analysis, # Angle features
'bbox': (x1, y1, x2, y2), # Bounding box
'center': center, # Center coordinates
'species_id': species_id, # Species ID
'species_name': species_name, # Species name
'species_conf': species_conf # Species confidence
}
fish_analysis.append(fish_data)
detections.append({
'center': center,
'data': fish_data,
'species': species_id
})
# Update tracking information and assign IDs
current_ids = self.update_tracking(detections, frame_count)
# Add ID to each fish
for i, detection in enumerate(detections):
if 'id' in detection:
detection['data']['id'] = int(detection['id'])
else:
detection['data']['id'] = i
# Count statistics
current_count = len(fish_analysis) # Current frame fish count
total_unique_count = len(self.seen_ids) # Historical unique fish count (deduplicated)
# Detect fish clusters
cluster_found, cluster_center, cluster_radius = self.detect_clusters(centers)
# Update alarm status (activate only after 5 consecutive frames to avoid false alarms)
if cluster_found:
self.alarm_counter += 1
if self.alarm_counter >= 5:
self.alarm_active = True
# Record alarm event
self.alarm_history.append({
'frame': frame_count,
'time': time.time(),
'center': cluster_center,
'radius': cluster_radius,
'fish_count': current_count
})
else:
self.alarm_counter = max(0, self.alarm_counter - 1)
if self.alarm_counter == 0:
self.alarm_active = False
# Send cluster alert to SIoT
self.send_cluster_alarm(self.alarm_active)
return fish_analysis, current_count, total_unique_count, cluster_found, cluster_center, cluster_radius
def draw_analysis(self, frame, analysis_results, current_count, total_unique_count,
cluster_found, cluster_center, cluster_radius, fps=0):
"""Draw analysis results on frame (angle text drawing removed)"""
# Display count information
cv2.putText(frame, f"Current: {current_count}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.putText(frame, f"Total: {total_unique_count}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# Display species statistics
y_offset = 120
for species_id, count in self.species_count.items():
species_name = self.fish_species.get(species_id, "Unknown")
total_count = self.total_species_count.get(species_id, 0)
cv2.putText(frame, f"{species_name}: {count} (Total: {total_count})",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
y_offset += 20
# Draw fish cluster alarm
if cluster_found and cluster_center is not None:
# Draw cluster area
cv2.circle(frame, (int(cluster_center[0]), int(cluster_center[1])),
int(cluster_radius), (0, 0, 255), 2)
cv2.circle(frame, (int(cluster_center[0]), int(cluster_center[1])),
5, (0, 0, 255), -1)
# Draw alarm text
if self.alarm_active:
# Blinking effect
if int(time.time() * 2) % 2 == 0:
cv2.putText(frame, "FISH CLUSTER!",
(frame.shape[1] // 2 - 90, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 3)
# Draw red border
cv2.rectangle(frame, (0, 0), (frame.shape[1]-1, frame.shape[0]-1),
(0, 0, 255), 10)
else:
cv2.putText(frame, "Fish Cluster Detected",
(int(cluster_center[0]) - 100, int(cluster_center[1]) - int(cluster_radius) - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
for result in analysis_results:
points = result['points']
bbox = result.get('bbox')
fish_id = result.get('id', 'N/A')
species_name = result.get('species_name', 'Unknown')
species_conf = result.get('species_conf', 0)
# Ensure fish_id is integer
if fish_id != 'N/A':
fish_id = int(fish_id)
color = self.colors[fish_id % len(self.colors)]
else:
color = (0, 255, 0)
# Draw trail
if fish_id != 'N/A' and fish_id in self.tracked_fish:
trail = self.tracked_fish[fish_id]['trail']
for i in range(1, len(trail)):
cv2.line(frame,
(int(trail[i-1][0]), int(trail[i-1][1])),
(int(trail[i][0]), int(trail[i][1])),
color, 2)
# Draw bounding box
if bbox:
x1, y1, x2, y2 = bbox
cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
# Display fish ID and species
display_text = f"ID: {fish_id} - {species_name} ({species_conf:.2f})"
cv2.putText(frame, display_text, (int(x1), int(y1)-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
# Draw keypoints
for name, point in points.items():
if point is not None:
x, y = point
cv2.circle(frame, (int(x), int(y)), 5, color, -1)
cv2.putText(frame, name[:2], (int(x)+5, int(y)),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
# Draw connecting line (head to tail)
if points.get('head') and points.get('tail'):
cv2.line(frame,
(int(points['head'][0]), int(points['head'][1])),
(int(points['tail'][0]), int(points['tail'][1])),
(0, 255, 255), 2)
return frame
# Main program
if __name__ == "__main__":
# Initialize analyzer
analyzer = FishAnalyzer()
# Network stream address
url = 'http://192.168.11.72/stream' # UNIHIKER K10 IP address
# Create window
cv2.namedWindow("Fish Behavior Analysis", cv2.WINDOW_AUTOSIZE)
# Initialize video parameters
frame_width = 640 # Default value, will get from first frame
frame_height = 480 # Default value, will get from first frame
fps = 30 # Default frame rate
# Create output video
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('output.avi', fourcc, fps, (frame_width, frame_height))
frame_count = 0 # Frame counter
start_time = time.time()
# Start HTTP request to get video stream
try:
response = requests.get(url, stream=True, timeout=10)
print("Connected to stream")
img_data = b'' # Buffer for received image data
# Read data in chunks
for chunk in response.iter_content(chunk_size=1024):
if chunk:
img_data += chunk # Accumulate received data
# Check for complete JPEG data
start_idx = img_data.find(b'\xff\xd8') # Find JPEG start marker
end_idx = img_data.find(b'\xff\xd9') # Find JPEG end marker
if start_idx != -1 and end_idx != -1:
jpg_data = img_data[start_idx:end_idx+2] # Extract complete JPEG data
# Convert to NumPy array
img_np = np.frombuffer(jpg_data, dtype=np.uint8)
frame = cv2.imdecode(img_np, cv2.IMREAD_COLOR) # Decode JPEG image
if frame is not None:
frame_count += 1
# Get actual dimensions from first frame
if frame_count == 1:
frame_height, frame_width = frame.shape[:2]
# Reinitialize video writer
out = cv2.VideoWriter('output.avi', fourcc, fps, (frame_width, frame_height))
# Calculate FPS
elapsed_time = time.time() - start_time
current_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
# Analyze fish behavior
results, current_count, total_unique_count, cluster_found, cluster_center, cluster_radius = analyzer.analyze_fish(frame, frame_count)
# Draw analysis results
frame_with_analysis = analyzer.draw_analysis(
frame, results, current_count, total_unique_count,
cluster_found, cluster_center, cluster_radius, current_fps)
# Write to output video
out.write(frame_with_analysis)
# Display result
cv2.imshow("Fish Behavior Analysis", frame_with_analysis)
# Clear processed data
img_data = img_data[end_idx+2:] # Remove processed image data
# Exit on 'q' key press
if cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
print("No data received!")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
finally:
# Release resources
out.release()
cv2.destroyAllWindows()
# Print final statistics
print("\nFish Statistics:")
print(f"Total Count: {analyzer.total_species_count}")
for species_id, count in analyzer.total_species_count.items():
species_name = analyzer.fish_species.get(species_id, "Unknown")
print(f"{species_name}: {count}")
# Print alarm history
if analyzer.alarm_history:
print("\nCluster Alarm History:")
for alarm in analyzer.alarm_history:
print(f"Frame {alarm['frame']}: Cluster at ({alarm['center'][0]:.1f}, {alarm['center'][1]:.1f}) "
f"with {alarm['fish_count']} fish")
# Disconnect from SIoT
siot.disconnect()
print("SIoT connection closed")
Let's take a look together at how each function of this code is specifically implemented.
(1) Detection of four types of fish: Identify four types of fish through the visual features of the model
This system uses the YOLO model to identify four types of fish through visual features, namely: goldfish, koi, betta, and other fish. The identification is based on the category ID output by the model, corresponding to the four fish categories clearly defined in the code.
self.fish_species = {
0: "Goldfish",
1: "Koi",
2: "Betta",
3: "Other" # Default category
}
cls_id = box.cls.item()
species_id = int(cls_id)
species_name = self.fish_species.get(species_id, "Unknown")
if box.conf.item() < 0.5:
continue
(2)Keypoints detection of fish: By calculating the tilt Angle or included Angle and other angles of each key point, the swimming direction and body posture of the fish can be determined.
1. Keypoints for detection: head, body, tail, dorsal fin, left caudal fin, right caudal fin.
2. Basis for judgment:
- The keypoint coordinates and confidence levels (keypoints.xy and keypoints.conf) output by YOLO
- Only retain the keypoints with a confidence level greater than 0.3
- If body_center is missing, the left and right shoulder keypoints will be used for calculation
3. Application: Calculate the swimming direction, body posture, tail fin Angle and other behavioral characteristics of fish.
self.keypoint_mapping = {
'head': 0,
'body_center': 5,
'tail': 11,
'dorsal_fin': 6,
'caudal_fin_left': 13,
'caudal_fin_right': 14
}
kpts = keypoints.xy[0].numpy()
confs = keypoints.conf[0].numpy() if keypoints.conf is not None else [1.0] * len(kpts)
points = {}
for name, idx in self.keypoint_mapping.items():
if idx < len(kpts) and confs[idx] > self.confidence_threshold:
points[name] = (kpts[idx][0], kpts[idx][1])
else:
points[name] = None
if points.get('body_center') is None:
if confs[5] > self.confidence_threshold and confs[6] > self.confidence_threshold:
left_shoulder = kpts[5]
right_shoulder = kpts[6]
body_center_x = (left_shoulder[0] + right_shoulder[0]) / 2
body_center_y = (left_shoulder[1] + right_shoulder[1]) / 2
points['body_center'] = (body_center_x, body_center_y)
'swim_direction': self.calculate_vector_angle(points.get('head'), points.get('tail'))
'body_tilt': self.calculate_vector_angle(points.get('head'), points.get('body_center'))
'caudal_fin_angle': self.calculate_three_point_angle(
points.get('caudal_fin_left'),
points.get('tail'),
points.get('caudal_fin_right'))
'dorsal_fin_angle': self.calculate_vector_angle(
points.get('body_center'), points.get('dorsal_fin'))
(3)Aggregation alarm: First, extract the center point coordinates of each fish. Then, check the number of fish and calculate whether the average distance between each fish is less than the threshold.
- Core judgment conditions: Average distance < 150 pixels and the number of fish ≥ 3.
- Anti-shake mechanism: An alarm is triggered only when a cluster is detected for five consecutive frames.
- Visualization: Draw red circles on the image to mark the cluster area (draw_analysis method).
centers = []
for result in results:
for box, keypoints in zip(result.boxes, result.keypoints):
x1, y1, x2, y2 = box.xyxy[0].numpy()
center = ((x1 + x2) / 2, (y1 + y2) / 2)
centers.append(center)
if len(centers) < self.min_fish_for_cluster:
return False, None, None
total_distance = 0
count = 0
for i in range(len(centers)):
for j in range(i+1, len(centers)):
dist = math.hypot(centers[i][0] - centers[j][0], centers[i][1] - centers[j][1])
total_distance += dist
count += 1
avg_distance = total_distance / count if count > 0 else float('inf')
cluster_found = avg_distance < self.cluster_threshold # self.cluster_threshold = 150
if cluster_found:
self.alarm_counter += 1
if self.alarm_counter >= 5:
self.alarm_active = True
self.alarm_history.append({
'frame': frame_count,
'time': time.time(),
'center': cluster_center,
'radius': cluster_radius,
'fish_count': current_count
})
else:
self.alarm_counter = max(0, self.alarm_counter - 1)
if self.alarm_counter == 0:
self.alarm_active = False
Since we need to obtain the camera screen from the UNIHIKER K10, we need to replace it with the IP address of your own UNIHIKER K10 in line 477 of the code.

Copy the code into the "fish.py" file created in Mind+.(Note: The complete "fish.py" file is provided as an attachment for reference)
Click Run to start the program.

When the fish's body is fully exposed in the window, the program will detect the position of the fish's body.
At this point, if a school of fish is displayed in front of the camera, the result of fish school recognition will be shown in the video frame.
3.3 Task 3:UNIHIKER K10 Receives Results and Executes Control
Next, we will implement the last function. UNIHIKER K10 will perform corresponding functions based on the fish aggregation information received from SIOT, such as sounding an alarm when the fish are overly concentrated.
(1) Hardware Preparation
Make sure that Mind+ is opened and the UNIHIKER board has been successfully loaded. Once confirmed, you can proceed to write the complete project program.
(2)Write the Program
STEP One: Subscribe to SIoT Topics on UNIHIKER K10
Add an MQTT subscribe block and in the topic input field, enter "siot/fish".

STEP Two: Match the Corresponding Picture
Use the "When MQTT message received from topic_0" block to enable the smart terminal to process commands from siot/emo

Use the "if-then" block and the comparison operator to set the functionality of the UNIHIKER K10 based on the content of the MQTT message. If the MQTT message is equal to "a", the function of alerting when fish schools gather will be implemented.

Then, use the "Cache Local Image" and "Show Cached Content" modules to display the image on the screen of the UNIHIKER K10. When an abnormal aggregation of fish is detected, the words "Note anomalies in fish aggregations" will be seen to remind the user.

Select the pictures in the attachment folder and choose the ones printed with "Note anomalies in fish aggregations".

Then, use the "Play Built-in Music in the Background" and "Stop Background Playback" modules to sound an alarm when fish schools abnormally gather.

Below is the reference for the complete program.

Click Upload button,when the burning progress reaches 100%, it indicates that the program has been successfully uploaded.

STEP:
(1) Click the Run button to start fish.py.
(2) The camera focuses on the school of fish:
- UNIHIKER K10 can take videos, and the analysis results of the fish population status will be displayed in the video frames.
- When fish schools abnormally gather, an alarm is triggered.

4.Knowledge Hub
4.1 What is the Fish Keypoints Model and What are Its Applications?
The fish keypoint model is a technical model based on computer vision (CV) and deep learning. Its core function is to automatically identify and locate the key feature points (i.e., "keypoints") with biological significance on the fish's body through algorithms, and construct the spatial relationships between these points, thereby achieving quantitative analysis of the fish's morphology, behavior, and physiological state.
These "keypoints" are not randomly selected and usually correspond to the critical positions in the anatomy or behavior of fish, such as:
- Morphologically related: the tip of the snout (mouth), the center of the eyes, the starting point/apex/end point of the dorsal fin, the base/bifurcation point of the caudal fin, the starting point of the pelvic fin, the starting point of the anal fin, the posterior edge of the gill cover, the center of gravity of the body, etc.
- Behavior-related: the endpoints of the tail fin swing, the extreme points of the pectoral fin opening and closing, the rotation centers of the head turning, etc.

4.2 What are the ways to Detect the state of Fish Schools Based on Artificial Intelligence? What are their Applications?
Fish school status detection based on artificial intelligence mainly utilizes technologies such as computer vision and deep learning, and is achieved through the analysis of fish school images or videos. It mainly includes: 1. Computer vision recognition;2.AI deep learning analysis; 3. Methods based on density estimation;

In life, the detection model of fish school status based on artificial intelligence has very wide applications, and it is the basis of the following practical applications:
- Fishery resource management: By real-time monitoring of the number, distribution and growth of fish stocks, it helps fishery managers adjust fishing operation strategies in a timely manner to avoid resource depletion caused by overfishing.
- Ecological protection: It helps to discover endangered species and potential populations of freshwater fish, providing strong support for protecting the ecological environment and biodiversity. For instance, in a certain reservoir, researchers used LUMA AI technology to conduct long-term monitoring of fish populations, discovered signs of some endangered fish species, and promptly took protective measures.
- Scientific research: It has provided a large amount of accurate data support for studies in fish behavior, ecology, etc. Through long-term monitoring and analysis of the state of fish schools, researchers can gain a deep understanding of the behavioral patterns, ecological habits, and population dynamics of fish,providing basic data and theoretical support for related scientific research.
5.Appendix of Materials
Google drive link: https://drive.google.com/drive/folders/1IqrnQPb5RWaeqUlAw2t2JtI8x900sHxa?usp=drive_link









