RFID 閘門與人類方向偵測系統

RFID 閘門與人類方向偵測系統。該系統是一個具有人類方向偵測功能的 RFID 閘門系統。系統使用 Spring Boot 作為主伺服器,通過純 Socket 與 RFID 閱讀器連接,並利用 FastAPI 的 WebSocket 進行 YOLO 偵測。系統能夠使用 YOLO 偵測人類的移動方向,並相應地記錄物品的訪問方向。

功能

  • RFID 閱讀器 + Java(純 Socket 通訊):使用純 Socket 通訊與 RFID 閱讀器進行通訊,以有效處理低規格硬體的數據。
  • 主伺服器(Java Spring Boot):通過純 Socket 從 RFID 閱讀器讀取數據,將數據儲存到 Redis,並記錄到 MongoDB,同時處理前端的操作。能夠通過 WebSocket 與其他微服務進行通訊。
  • Python Flask 伺服器:整合 YOLO 和 DeepSort tracker,以偵測人員的左右方向,並通過 REST API 將方向回傳給主伺服器。
  • React 前端:顯示實時相機影像和表格數據。允許修改 RFID 天線的功率,並使用 WebSocket 與主伺服器進行通訊。
  • Python 喇叭腳本:根據掃描到的 RFID 標籤數量播放聲音。使用 WebSocket 與主伺服器進行通訊。

工作流程

  1. 用戶互動:攜帶有 RFID 標籤的物品的用戶從右側或左側接近閘門。
  2. RFID 掃描:RFID 天線持續掃描 RFID 標籤,將其記錄在 Redis 中並新增到 MongoDB 進行日誌記錄。
  3. 數據記錄:當 RFID 標籤被掃描時,會將其存儲在 Redis 中,生命週期為 6 秒,對應於掃描區域長度。
  4. 方向偵測:如果 Redis 池中存在 RFID 標籤,並且從 Flask 伺服器接收到人類方向數據,系統將從 Redis 池中檢索所有相關的 RFID 數據並保存到 MongoDB。這些標籤將被移至 30 秒的冷卻池,以防止重複掃描。
  5. 重複防止:冷卻機制確保在冷卻期間 RFID 標籤被自動過濾,防止在檢測到移動後進行重複掃描。
  6. 前端顯示:用戶可以在 React 前端實時查看記錄結果,包括相機影像和表格數據。
  7. 日誌訪問:用戶可以訪問所有日誌數據和數據傳輸歷史,因為主伺服器也提供對第三方服務的 API 調用。
  8. 天線控制:用戶可以通過網頁直接調整 RFID 天線功率或開關天線。

技術

  • RFID 閱讀器:用於掃描 RFID 標籤的硬體元件。
  • Java:用於與 RFID 閱讀器的 Socket 通訊。
  • Java Spring Boot:主伺服器的後端框架。
  • Redis:用於快速數據訪問的內存數據存儲。
  • MongoDB:用於日誌記錄和數據儲存的 NoSQL 數據庫。
  • Python Flask:用於 YOLO 偵測的後端框架。
  • YOLO(You Only Look Once):實時物體偵測系統。
  • DeepSort Tracker:物體追蹤算法。
  • React:用於構建用戶界面的前端庫。
  • WebSocket:用於前後端之間實時通訊的協議。
  • Python:用於腳本化喇叭功能。

開發見解

  • WebSocket 實時視頻串流:使用 WebSocket 實現實時視頻串流遇到了延遲和卡頓的挑戰。最終,通過 Python Flask 串流影像並在 React 前端呈現,提供了更可靠的解決方案。
  • 低規格 RFID 閱讀器限制:RFID 閱讀器的 RAM 和 CPU 資源有限,因此必須使用純 Socket 通訊,而不能使用需要額外庫和資源的 WebSocket。
  • 多人移動偵測挑戰:即使使用 DeepSort 追蹤,當多於兩人從相反方向同時通過閘門時,由於 RFID 閱讀器無法獲取 x、y、z 軸數據,仍然難以準確偵測。
  • 攝像頭偵測穩定性:當人員移動迅速時,YOLO 有時無法正確偵測人類,導致方向計算不準確。實施了緩衝區(x 軸的 0-10 和 90-100)以提高偵測準確性,防止錯誤的移動觸發。

tracks = tracker.update_tracks(detections, frame=frame)

current_ids = set()
for track in tracks:
    if not track.is_confirmed() or track.time_since_update > 1:
        continue

    track_id = track.track_id
    current_ids.add(track_id)
    bbox = track.to_tlbr()
    x1, y1, x2, y2 = map(int, bbox)  # Human Box Coordinate
    object_center = ((x1 + x2) // 2, (y1 + y2) // 2)

    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
    cv2.putText(frame, f"ID: {track_id}", (x1, y1 - 10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

    if track_id not in movement_states:  # Cannot just calculate the movement with 'perfect' in and exist axis
        if object_center[0] < frame_width // 3:
            movement_states[track_id] = {
                'start': 'left', 'end': None, 'detected': True, 'direction': None, 'enterTime': datetime.now(timezone.utc)}
        elif object_center[0] > frame_width * 2 // 3:
            movement_states[track_id] = {
                'start': 'right', 'end': None, 'detected': True, 'direction': None, 'enterTime': datetime.now(timezone.utc)}
        else:
            movement_states[track_id] = {
                'start': 'middle', 'end': None, 'detected': True, 'direction': None, 'enterTime': datetime.now(timezone.utc)}

    state = movement_states[track_id]
    if object_center[0] > frame_width * 2 // 3:
        state['end'] = 'right'
        state['direction'] = 'right'
    elif object_center[0] < frame_width // 3:
        state['end'] = 'left'
        state['direction'] = 'left'

    state['detected'] = True
  • 處理不完整的移動:需要管理從一側進入但未通過閘門而返回的情況,確保不會錯誤地觸發移動。通過 DeepSort 追蹤和移動狀態管理,確保只有從指定起始側到結束側的移動被識別。
for track_id in list(movement_states.keys()):
    state = movement_states[track_id]
    if track_id not in current_ids:
        state['detected'] = False
        if has_disappeared(track_id, movement_states):
            enter_time = state['enterTime']
            exit_time = datetime.now(timezone.utc)

            # Check if starting from left and need to leave from right
            if state['start'] == 'left' and state['end'] == 'right':
                print(f"Person {track_id} moved from Left to Right")
                asyncio.run(send_movement_data(
                    uri_for_movement, "right", enter_time, exit_time))
            elif state['start'] == 'right' and state['end'] == 'left':
                print(f"Person {track_id} moved from Right to Left")
                asyncio.run(send_movement_data(
                    uri_for_movement, "left", enter_time, exit_time))
            del movement_states[track_id]