개발 노트

MQTT/web-worker 다중 컴포넌트 연동 본문

React

MQTT/web-worker 다중 컴포넌트 연동

한츄 2024. 2. 22. 18:14

1. Context를 사용한 전역관리

import React, { useEffect, useRef, createContext,useContext } from 'react';
import { useMqttStore } from '@/store/useMqttStore';

// 웹 워커를 공유하기 위한 컨텍스트 생성
export const WorkerContext = createContext<Worker | null>(null);

type Props ={
  children:React.ReactNode;
}
const currentTime =new Date().toLocaleString('ko-KR', { hour12: false, year: 'numeric', month: '2-digit', day: '2-digit', hour: '2-digit', minute: '2-digit', second: '2-digit' })
.replace(/. \d{4}/, '')
.replace(/(\d{4})\. (\d{2})\. (\d{2})\. (\d{2}):(\d{2}):(\d{2})/, '$1-$2-$3 $4:$5:$6')

// 웹 워커를 제공하는 프로바이더 컴포넌트
const WorkerProvider: React.FC<Props> = ({ children }) => {
  const workerRef = useRef<Worker>();
  const { addMessage } = useMqttStore();

  const extractTime = (message: string): string => {
    // 메시지에서 NOWTIME 또는 now 값을 찾기
    const NOWTIME = message.match(/"NOWTIME":"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"/)?.[1];
    const now = message.match(/"now":"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"/)?.[1];
  
    // 찾은 값 중에서 첫 번째 값을 반환하거나, 없으면 현재 시간을 반환
    return NOWTIME || now || currentTime;
  }
  

  useEffect(() => {
    const URL = '/mqttWorker.built.js';
    workerRef.current = new Worker(URL); // 웹 워커 초기화

    workerRef.current.onmessage = (event) => { // 웹 워커로부터 메시지를 받을 때
      const payload = {
        topic: event.data.topic,
        message: event.data.message,
        time: extractTime(event.data.message)
      };
      addMessage(payload); // Zustand 스토어에 메시지 추가
    };

    return () => {
      workerRef.current?.terminate(); // 컴포넌트 unmount시 웹 워커 종료
    };
        // eslint-disable-next-line react-hooks/exhaustive-deps
  }, []);

  return (
    <WorkerContext.Provider value={workerRef.current || null}>
      {children}
    </WorkerContext.Provider>
  );
};

export default WorkerProvider

 

외부에서 데이터가 계속 들어오고 관리되고 있었으면 한다고 하셔서 수정

 

2.zustand내부에 전체관리로직을 넣고 관리

import { create } from 'zustand';

export type Message = { topic: string; message: string; time: string };
type Store = {
  worker: Worker | null;
  messages: Message[];
  groupedMessages: { [key: string]: Message[] };
  sortedFarmCodes: string[];
  addMessage: (payload: Message) => void;
  findMessageByTopic: (topic: string) => Message | undefined;
  stopMqtt: () => void;
  subscribeMqtt: (topics: string[]) => void;
  groupAndSortMessages: () => void;
};

const currentTime = new Date()
  .toLocaleString('ko-KR', {
    hour12: false,
    year: 'numeric',
    month: '2-digit',
    day: '2-digit',
    hour: '2-digit',
    minute: '2-digit',
    second: '2-digit',
  })
  .replace(/. \d{4}/, '')
  .replace(/(\d{4})\. (\d{2})\. (\d{2})\. (\d{2}):(\d{2}):(\d{2})/, '$1-$2-$3 $4:$5:$6');

const extractTime = (message: string): string => {
  // 메시지에서 NOWTIME 또는 now 값을 찾기
  const NOWTIME = message.match(/"NOWTIME":"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"/)?.[1];
  const now = message.match(/"now":"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"/)?.[1];

  // 찾은 값 중에서 첫 번째 값을 반환하거나, 없으면 현재 시간을 반환
  return NOWTIME || now || currentTime;
};
export const useMqttStore = create<Store>((set, get) => {
  let worker: Worker | null = null;

  const startMqtt = () => {
    const URL = '/mqttWorker.built.js';
  
    try {
      worker = new Worker(URL);
    } catch (error) {
      console.error('웹 워커를 생성하는 데 실패했습니다:', error);
    }
  
    // 메시지 추가 함수
    const addMessage = (payload: Message) =>
      set((state) => {
        // 메시지 추가
        const messages = [...state.messages, payload];
        return { messages }; // 새로운 메시지 배열을 반환
      });
  
    if (worker) {
      worker.onmessage = function (event: MessageEvent<any>) {
        const payload = {
          topic: event.data.topic,
          message: event.data.message,
          time: extractTime(event.data.message),
        };
        addMessage(payload);
      };
  
      // 웹 워커에 메시지를 전송하여 모든 토픽을 구독합니다.
      worker.postMessage({
        CMD: 'MQTT_SUBSCRIBE',
        DATA: ['#'],
      });
      worker.onerror = function (error) {
        console.error('웹 워커에서 에러가 발생했습니다:', error);
      };
    }
  
    // 웹 워커가 준비되었을 때 worker 상태를 업데이트
    set({ worker });
  };
  
  // MQTT 시작
  startMqtt();

  return {
    worker,
    messages: [],
    groupedMessages: {},
    sortedFarmCodes: [],
    addMessage: (payload: Message) =>
      set((state) => {
        // 메시지 추가
        const messages = [...state.messages, payload];
        return { messages }; // 새로운 메시지 배열을 반환
      }),
    findMessageByTopic: (topic: string) => {
      const store = get();
      return store.messages.find((message) => message.topic === topic);
    },
    stopMqtt: () =>
      set((state) => {
        if (state.worker) {
          state.worker.terminate();
        }
        return { worker: null };
      }),
      subscribeMqtt: (topics: string[]) => {
        const { worker } = get();
      if (worker) {
        worker.postMessage({
          CMD: 'MQTT_SUBSCRIBE',
          DATA: topics,
        });
      }
    },
    groupAndSortMessages: () =>
      set((state) => {
        const groupedMessages = state.messages.reduce<{ [key: string]: Message[] }>((groups, message) => {
          const splitTopic = message.topic.split('/');
          const farmCode = splitTopic.length > 2 ? splitTopic[2] : '';

          // 시간 값을 추출
          const time = extractTime(message.message);

          if (!groups[farmCode]) {
            groups[farmCode] = [];
          }

          const parsedMessage = JSON.parse(message.message);
          const cmd = parsedMessage.CMD;

          if (
            !groups[farmCode].some((msg) => JSON.parse(msg.message).TOPIC === message.topic) &&
            cmd !== 'MAIN_ERROR' &&
            cmd !== 'SMART_ERROR'
          ) {
            // CMD가 MAIN_ERROR 또는 SMART_ERROR가 아닌 경우, 같은 CMD를 가진 이전 메시지를 제거
            groups[farmCode] = groups[farmCode].filter((msg) => {
              const parsedMsg = JSON.parse(msg.message);
              return parsedMsg.CMD !== cmd;
            });
          }

          groups[farmCode].push({ ...message, time });

          // 이 그룹의 메시지를 시간 순서대로 정렬
          return groups;
        }, {});

        const sortedFarmCodes = Object.entries(groupedMessages)
          .sort(([farmCodeA, messagesA], [farmCodeB, messagesB]) => {
            const lastMessageTimeA = messagesA[messagesA.length - 1].time;
            const lastMessageTimeB = messagesB[messagesB.length - 1].time;
            return new Date(lastMessageTimeB).getTime() - new Date(lastMessageTimeA).getTime(); // 내림차순 정렬
          })
          .map(([farmCode]) => farmCode);

        return { groupedMessages, sortedFarmCodes };
      }),
  };
});
  • 시작하자마자가 값을 받아오기위해 zustand내부에 시작하는 코드를 넣었으나 정상적으로 동작 하지않음
  • 값을 저장하기위해 useRef를 이용해야하는데 zustand내부에서는 관리할 수없어서 로직변경

3. zustand내부에 messages만 관리하고 나머지는 상위컴포넌트에서 실행

import { create } from 'zustand';

type Message = { topic: string; message: string };
type Store = {
  messages: Message[];
  addMessage: (payload: Message) => void;
  findMessageByTopic: (topic: string) => Message | undefined;
};

export const useMqttStore = create<Store>((set, get) => ({
  messages: [],
  addMessage: (payload: Message) => set((state) => ({ messages: [...state.messages, payload] })),
  findMessageByTopic: (topic: string) => {
    const store = get();
    return store.messages.find((message) => message.topic === topic);
  },
}));

 

//component내부

  useEffect(()=>{
    const URL = '/mqttWorker.built.js';
    const worker = new Worker(URL);
    workerRef.current = worker; // 웹 워커 객체를 ref에 저장
    worker.onmessage = function (event) {
      const payload = {
        topic: event.data.topic,
        message: event.data.message,
        time: extractTime(event.data.message)
      };
      addMessage(payload);
    };
    if (workerRef.current) {
      workerRef.current.postMessage({ CMD: 'MQTT_START' }); 
      workerRef.current.postMessage({
        CMD: 'MQTT_SUBSCIBE',
        DATA: ['#'],
      });
    }
        // eslint-disable-next-line react-hooks/exhaustive-deps
  },[])

3번에서 messages 배열을 저장한 후 다른 컴포넌트에서 호출

 

=> 3번방식으로 진행