개발 노트

MQTT 메세지 받아서 DB에 저장 본문

Node

MQTT 메세지 받아서 DB에 저장

알 수 없는 사용자 2023. 9. 25. 10:36

DB connection pool

let dbPool = null;

async function createPool() {
    try {
        dbPool = mysql.createPool({
            host: '127.0.0.1',
            user: 'root',
            password: 'ekdnsel',
            database: 'dawoon',
            waitForConnections: true, // 연결이 사용 가능할 때까지 대기
            connectionLimit: 1, // 연결 풀의 최대 연결 수
            queueLimit: 0, // 대기열에 들어갈 연결 요청의 최대 수 (0은 무제한)
            reconnect: {
                autoReconnect: true, // 연결이 끊어졌을 때 자동 재연결 설정
                maxReconnects: 3, // 최대 재시도 횟수 및 재시도 간격(ms)
                reconnectInterval: 2000,
            },
        console.log('pool 생성');
    } catch (error) {
        console.error('pool 에러: ', error);
        setTimeout(createPool, 5000);
    }
}
createPool();

 

 

MQTT  연결

const options = {
    host: '127.0.0.1',
    port: 1883,
    protocol: 'mqtt',
};
const client = mqtt.connect(options);

 

 

MQTT 연결확인

client.on("connect", () => {
    log("MQTT 연결성공_connected: " + client.connected);
})

client.on("error", (error) => {
    log("MQTT 연결실패_Can't connect" + error);
    process.exit(1)
});

 

 

Topic에 subscribe

const topic = "#";
client.subscribe(topic);

 

 

dw_water2 테이블에 mqtt에서 보낸 데이터 저장 (CMD가 ALIVE, ERROR, CALCULATE일 때)

const save_dw_water2Handler = (topic, inp) => {

    log('dw_water2에 저장');

    let START_WATERTMPR = inp?.START_WATERTMPR;
    let END_WATERTMPR = inp?.END_WATERTMPR;

    if (inp.CMD === 'COWWATER_ERROR') {
        START_WATERTMPR = inp?.WATERTMPR
        END_WATERTMPR = 0;
    }

    // dw_water2 테이블에 넣을 데이터 중복인지 체크
    let sqlSelct = `
        select COUNT(*) CNT from dw_water2
        where aniRFID = "${inp?.RFID}" AND CMD = "${inp?.CMD}" AND NOWTIME = "${inp?.NOWTIME}" AND CODE = "${inp?.CODE}" AND id = "${inp?.ID}"
    `
    log(sqlSelct);


    dbPool.getConnection((err, connection) => {
        if (err) {
            console.error('연결 획득 중 오류: ', err);
            return;
        }

        connection.query(sqlSelct,
            (error, results, fields) => {
                if (error) {
                    log(`dw_water2 DB 조회 실패_sqlSelct: ${error} aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`);


                    let para = { ...inp }
                    console.log('1111: ', inp)
                    para.DATE = inp.NOWTIME
                    message(topic, para, "FAIL", "dw_water2 조회 실패111");

                    connection.release();
                    return;
                }

                log('Query results:', results[0].CNT);

                if (Number(results[0].CNT) <= 0) {
                    let sqlInsert = `INSERT INTO dw_water2
                    (aniRFID, CMD ,NOWTIME ,CODE ,ID ,IDX
                    ,STABLE_NOW ,STATUS ,DATE ,ENTER_TIME ,VISIT_TIME
                    ,WATER_CNT1 ,WATER_CNT2 ,START_WEIGHT ,END_WEIGHT ,START_WATERTMPR ,END_WATERTMPR
                    ,ERROR_CODE ,ERROR_TIME)
                    VALUES("${inp?.RFID}","${inp?.CMD}", "${inp?.NOWTIME}", "${inp?.CODE}"
                    , "${inp?.ID}", "${inp?.INDEX}", "${inp?.STABLE_NOW}", "${inp?.STATUS}"
                    , "${inp?.DATE}", "${inp?.ENTER_TIME}", "${inp?.VISIT_TIME}", ${inp?.WATER_CNT1}, ${inp?.WATER_CNT2}
                    , ${inp?.START_WEIGHT}, ${inp?.END_WEIGHT}, ${START_WATERTMPR}, ${END_WATERTMPR}
                    , "${inp?.ERROR_CODE}", "${inp?.ERROR_TIME}") `
                    connection.query(sqlInsert,
                        (error, results, fields) => {
                            if (error) {
                                // log('DB 저장 실패', error)
                                log(`dw_water2 DB 저장 실패_sqlInsert: ${error} aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`)


                                let para = { ...inp }
                                para.DATE = inp.NOWTIME
                                message(topic, para, "FAIL", "dw_water2 저장 실패222");

                                connection.release();
                                return;
                            }
                            log(`dw_water2 DB 저장성공_sqlInsert: aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`);

                            let para = { ...inp }
                            para.DATE = inp.NOWTIME
                            message(topic, para, "SUCCESS", "dw_water2 저장 성공333");

                            connection.release();
                            return;
                        });
                }
                else {
                    log(`dw_water2 DB에 데이터 중복 발생: aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`);
                    let para = { ...inp }
                    para.DATE = inp.NOWTIME
                    message(topic, para, "DUP", "dw_water2 데이터 중복444");

                    connection.release();
                    return;
                }
            });
    })
}

 

 

dw_water 테이블에 mqtt에서 보낸 데이터 저장 (CMD가 CALCULATE일 때)

const save_dw_waterHandler = (topic, inp) => {
    log('dw_water에 저장');


    // dw_water 테이블에 넣을 데이터 중복인지 체크
    let sqlSelct_dw_water = `
        select COUNT(*) CNT from dw_water
        where aniRFID = "${inp?.RFID}" AND moveInDt = "${inp?.DATE}" AND moveInTm = "${inp?.ENTER_TIME}" AND waterStage = "${inp?.ID}"
    `
    log(sqlSelct_dw_water);

    dbPool.getConnection((err, connection) => {
        if (err) {
            console.error('dw_water 연결 획득 중 오류: ', err);
            return;
        }

        connection.query(sqlSelct_dw_water,
            (error, results, fields) => {
                console.log(88888888)
                console.log('result[0].CNT: ', results[0].CNT);
                if (Number(results[0].CNT) <= 0) {
                    let sql = `
                    CALL sp_water(
                        0, "${inp?.DATE}", "${inp?.ENTER_TIME}", "${inp?.VISIT_TIME}", ${inp?.START_WEIGHT}
                        , ${inp?.END_WEIGHT}, ${inp?.START_WEIGHT}-${inp?.END_WEIGHT}, ${inp?.WATER_CNT1}, ${inp?.WATER_CNT2}
                        , "${inp?.ID}", "${inp.START_WATERTMPR}", "${inp?.END_WATERTMPR}", "Y", null, "${inp?.RFID}")
                    `
                    console.log('sql: ', sql);

                    connection.query(sql,
                        (error, results, fields) => {
                            if (error) {
                                log(`dw_water DB 저장 실패_sql: ${error} aniRFID = "${inp?.RFID}", moveInDt = "${inp?.DATE}", moveInTm = "${inp?.ENTER_TIME}", waterStage = "${inp?.ID}"`);

                                let para = { ...inp }
                                para.DATE = inp.NOWTIME
                                message(topic, para, "FAIL", "dw_water 저장 실패555");

                                connection.release();
                                return;
                            }
                            else {
                                log(`dw_water DB 저장 성공_sql: aniRFID = "${inp?.RFID}", moveInDt = "${inp?.DATE}", moveInTm = "${inp?.ENTER_TIME}", waterStage = "${inp?.ID}"`);
                                let para = { ...inp }
                                para.DATE = inp.NOWTIME
                                message(topic, para, "SUCCESS", "dw_water 저장 성공666");

                                connection.release();
                                return;
                            }
                        });
                }
                else {
                    log(`dw_water DB에 데이터 중복 발생: aniRFID = "${inp?.RFID}", moveInDt = "${inp?.DATE}", moveInTm = "${inp?.ENTER_TIME}", waterStage = "${inp?.ID}"`);
                    let para = { ...inp }
                    para.DATE = inp.NOWTIME
                    message(topic, para, "DUP", "dw_water 데이터 중복777");

                    connection.release();
                    return;
                }

            }
        )

    });
}

 

 

메세지 받기 (구독한 메세지가 왔을 때, message 이벤트로 리스너를 만들어 처리 가능)

client.on('message', (topic, message, packet) => {
    let obj = JSON.parse(message);
    log('CMD: ', obj.CMD);

    if (obj.CMD === 'COWWATER_ALIVE' || obj.CMD === 'COWWATER_ERROR' || obj.CMD === 'COWWATER_CALCULATE') {
        save_dw_water2Handler(topic, obj);
    }
    if (obj.CMD === 'COWWATER_CALCULATE') {
        save_dw_waterHandler(topic, obj);
    }
});

 

 

MQTT Client를 실행시켜 message를 전송하면 DB에 메세지 값이 저장된다.