image_to_pixle_params_yoloSAM/ultralytics-main/ultralytics/solutions/queue_management.py

96 lines
4.3 KiB
Python
Raw Permalink Normal View History

2025-07-14 17:36:53 +08:00
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
from typing import Any
from ultralytics.solutions.solutions import BaseSolution, SolutionAnnotator, SolutionResults
from ultralytics.utils.plotting import colors
class QueueManager(BaseSolution):
"""
Manages queue counting in real-time video streams based on object tracks.
This class extends BaseSolution to provide functionality for tracking and counting objects within a specified
region in video frames.
Attributes:
counts (int): The current count of objects in the queue.
rect_color (Tuple[int, int, int]): RGB color tuple for drawing the queue region rectangle.
region_length (int): The number of points defining the queue region.
track_line (List[Tuple[int, int]]): List of track line coordinates.
track_history (Dict[int, List[Tuple[int, int]]]): Dictionary storing tracking history for each object.
Methods:
initialize_region: Initialize the queue region.
process: Process a single frame for queue management.
extract_tracks: Extract object tracks from the current frame.
store_tracking_history: Store the tracking history for an object.
display_output: Display the processed output.
Examples:
>>> cap = cv2.VideoCapture("path/to/video.mp4")
>>> queue_manager = QueueManager(region=[100, 100, 200, 200, 300, 300])
>>> while cap.isOpened():
>>> success, im0 = cap.read()
>>> if not success:
>>> break
>>> results = queue_manager.process(im0)
"""
def __init__(self, **kwargs: Any) -> None:
"""Initialize the QueueManager with parameters for tracking and counting objects in a video stream."""
super().__init__(**kwargs)
self.initialize_region()
self.counts = 0 # Queue counts information
self.rect_color = (255, 255, 255) # Rectangle color for visualization
self.region_length = len(self.region) # Store region length for further usage
def process(self, im0) -> SolutionResults:
"""
Process queue management for a single frame of video.
Args:
im0 (numpy.ndarray): Input image for processing, typically a frame from a video stream.
Returns:
(SolutionResults): Contains processed image `im0`, 'queue_count' (int, number of objects in the queue) and
'total_tracks' (int, total number of tracked objects).
Examples:
>>> queue_manager = QueueManager()
>>> frame = cv2.imread("frame.jpg")
>>> results = queue_manager.process(frame)
"""
self.counts = 0 # Reset counts every frame
self.extract_tracks(im0) # Extract tracks from the current frame
annotator = SolutionAnnotator(im0, line_width=self.line_width) # Initialize annotator
annotator.draw_region(reg_pts=self.region, color=self.rect_color, thickness=self.line_width * 2) # Draw region
for box, track_id, cls, conf in zip(self.boxes, self.track_ids, self.clss, self.confs):
# Draw bounding box and counting region
annotator.box_label(box, label=self.adjust_box_label(cls, conf, track_id), color=colors(track_id, True))
self.store_tracking_history(track_id, box) # Store track history
# Cache frequently accessed attributes
track_history = self.track_history.get(track_id, [])
# Store previous position of track and check if the object is inside the counting region
prev_position = None
if len(track_history) > 1:
prev_position = track_history[-2]
if self.region_length >= 3 and prev_position and self.r_s.contains(self.Point(self.track_line[-1])):
self.counts += 1
# Display queue counts
annotator.queue_counts_display(
f"Queue Counts : {str(self.counts)}",
points=self.region,
region_color=self.rect_color,
txt_color=(104, 31, 17),
)
plot_im = annotator.result()
self.display_output(plot_im) # Display output with base class function
# Return a SolutionResults object with processed data
return SolutionResults(plot_im=plot_im, queue_count=self.counts, total_tracks=len(self.track_ids))