def continuous_acquire_events(self, event_callback=None):
"""Continuously acquire events and emit frames via the events_acquired signal.
Uses PeriodicFrameGenerationAlgorithm to generate fixed-frequency frames, emitted for UI display.
Parameters:
event_callback (callable, optional): Callback function to process events before frame conversion, returning a structured event array or None.
Exceptions:
Catches and logs all exceptions to ensure program robustness.
"""
# try:
# Get sensor geometry information
if self.i_geometry is None:
logger.error("Geometry module is not available")
return
height = self.i_geometry.get_height()
width = self.i_geometry.get_width()
# Initialize frame generation algorithm, set 10ms accumulation time, 100fps, dark color palette
frame_gen = PeriodicFrameGenerationAlgorithm(
sensor_width=width,
sensor_height=height,
accumulation_time_us=50000, # 50ms accumulation time
fps=20.0, # 20 frames per second
palette=ColorPalette.Dark # Use dark color palette
)
# Pre-allocate frame buffer (RGB, three channels)
frame = np.zeros((height, width, 3), dtype=np.uint8)
# Set frame generation callback to emit generated frames via signal
def frame_callback(ts, frame_data):
print(4, time.time())
self.events_acquired.emit(frame_data) # Emit a copy of the frame to avoid memory issues
frame_gen.set_output_callback(frame_callback)
# Initialize EventsIterator to stream events from the device
iterator = EventsIterator.from_device(
device=self.cam,
delta_t=10000, # 10ms time slice, synchronized with frame generation
mode='delta_t',
max_duration=None,
relative_timestamps=False # todo: Should this be set to True for each new recording?
)
# live_iterator = LiveReplayEventsIterator(iterator)
# Iterate over and process events
for events in iterator:
if events.size == 0:
continue
# If a callback function is provided, apply it
if event_callback is not None:
events = event_callback(events)
if events is None or not isinstance(events, np.ndarray):
continue
frame_gen.process_events(events)
if not self.isPreviewing:
break
# Force generation of the last frame when the stream ends
frame_gen.force_generate()
self.isPreviewing = False
del iterator # Explicitly delete the iterator