Node
MQTT 메세지 받아서 DB에 저장
알 수 없는 사용자
2023. 9. 25. 10:36
DB connection pool
let dbPool = null;
async function createPool() {
try {
dbPool = mysql.createPool({
host: '127.0.0.1',
user: 'root',
password: 'ekdnsel',
database: 'dawoon',
waitForConnections: true, // 연결이 사용 가능할 때까지 대기
connectionLimit: 1, // 연결 풀의 최대 연결 수
queueLimit: 0, // 대기열에 들어갈 연결 요청의 최대 수 (0은 무제한)
reconnect: {
autoReconnect: true, // 연결이 끊어졌을 때 자동 재연결 설정
maxReconnects: 3, // 최대 재시도 횟수 및 재시도 간격(ms)
reconnectInterval: 2000,
},
console.log('pool 생성');
} catch (error) {
console.error('pool 에러: ', error);
setTimeout(createPool, 5000);
}
}
createPool();
MQTT 연결
const options = {
host: '127.0.0.1',
port: 1883,
protocol: 'mqtt',
};
const client = mqtt.connect(options);
MQTT 연결확인
client.on("connect", () => {
log("MQTT 연결성공_connected: " + client.connected);
})
client.on("error", (error) => {
log("MQTT 연결실패_Can't connect" + error);
process.exit(1)
});
Topic에 subscribe
const topic = "#";
client.subscribe(topic);
dw_water2 테이블에 mqtt에서 보낸 데이터 저장 (CMD가 ALIVE, ERROR, CALCULATE일 때)
const save_dw_water2Handler = (topic, inp) => {
log('dw_water2에 저장');
let START_WATERTMPR = inp?.START_WATERTMPR;
let END_WATERTMPR = inp?.END_WATERTMPR;
if (inp.CMD === 'COWWATER_ERROR') {
START_WATERTMPR = inp?.WATERTMPR
END_WATERTMPR = 0;
}
// dw_water2 테이블에 넣을 데이터 중복인지 체크
let sqlSelct = `
select COUNT(*) CNT from dw_water2
where aniRFID = "${inp?.RFID}" AND CMD = "${inp?.CMD}" AND NOWTIME = "${inp?.NOWTIME}" AND CODE = "${inp?.CODE}" AND id = "${inp?.ID}"
`
log(sqlSelct);
dbPool.getConnection((err, connection) => {
if (err) {
console.error('연결 획득 중 오류: ', err);
return;
}
connection.query(sqlSelct,
(error, results, fields) => {
if (error) {
log(`dw_water2 DB 조회 실패_sqlSelct: ${error} aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`);
let para = { ...inp }
console.log('1111: ', inp)
para.DATE = inp.NOWTIME
message(topic, para, "FAIL", "dw_water2 조회 실패111");
connection.release();
return;
}
log('Query results:', results[0].CNT);
if (Number(results[0].CNT) <= 0) {
let sqlInsert = `INSERT INTO dw_water2
(aniRFID, CMD ,NOWTIME ,CODE ,ID ,IDX
,STABLE_NOW ,STATUS ,DATE ,ENTER_TIME ,VISIT_TIME
,WATER_CNT1 ,WATER_CNT2 ,START_WEIGHT ,END_WEIGHT ,START_WATERTMPR ,END_WATERTMPR
,ERROR_CODE ,ERROR_TIME)
VALUES("${inp?.RFID}","${inp?.CMD}", "${inp?.NOWTIME}", "${inp?.CODE}"
, "${inp?.ID}", "${inp?.INDEX}", "${inp?.STABLE_NOW}", "${inp?.STATUS}"
, "${inp?.DATE}", "${inp?.ENTER_TIME}", "${inp?.VISIT_TIME}", ${inp?.WATER_CNT1}, ${inp?.WATER_CNT2}
, ${inp?.START_WEIGHT}, ${inp?.END_WEIGHT}, ${START_WATERTMPR}, ${END_WATERTMPR}
, "${inp?.ERROR_CODE}", "${inp?.ERROR_TIME}") `
connection.query(sqlInsert,
(error, results, fields) => {
if (error) {
// log('DB 저장 실패', error)
log(`dw_water2 DB 저장 실패_sqlInsert: ${error} aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`)
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "FAIL", "dw_water2 저장 실패222");
connection.release();
return;
}
log(`dw_water2 DB 저장성공_sqlInsert: aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "SUCCESS", "dw_water2 저장 성공333");
connection.release();
return;
});
}
else {
log(`dw_water2 DB에 데이터 중복 발생: aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "DUP", "dw_water2 데이터 중복444");
connection.release();
return;
}
});
})
}
dw_water 테이블에 mqtt에서 보낸 데이터 저장 (CMD가 CALCULATE일 때)
const save_dw_waterHandler = (topic, inp) => {
log('dw_water에 저장');
// dw_water 테이블에 넣을 데이터 중복인지 체크
let sqlSelct_dw_water = `
select COUNT(*) CNT from dw_water
where aniRFID = "${inp?.RFID}" AND moveInDt = "${inp?.DATE}" AND moveInTm = "${inp?.ENTER_TIME}" AND waterStage = "${inp?.ID}"
`
log(sqlSelct_dw_water);
dbPool.getConnection((err, connection) => {
if (err) {
console.error('dw_water 연결 획득 중 오류: ', err);
return;
}
connection.query(sqlSelct_dw_water,
(error, results, fields) => {
console.log(88888888)
console.log('result[0].CNT: ', results[0].CNT);
if (Number(results[0].CNT) <= 0) {
let sql = `
CALL sp_water(
0, "${inp?.DATE}", "${inp?.ENTER_TIME}", "${inp?.VISIT_TIME}", ${inp?.START_WEIGHT}
, ${inp?.END_WEIGHT}, ${inp?.START_WEIGHT}-${inp?.END_WEIGHT}, ${inp?.WATER_CNT1}, ${inp?.WATER_CNT2}
, "${inp?.ID}", "${inp.START_WATERTMPR}", "${inp?.END_WATERTMPR}", "Y", null, "${inp?.RFID}")
`
console.log('sql: ', sql);
connection.query(sql,
(error, results, fields) => {
if (error) {
log(`dw_water DB 저장 실패_sql: ${error} aniRFID = "${inp?.RFID}", moveInDt = "${inp?.DATE}", moveInTm = "${inp?.ENTER_TIME}", waterStage = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "FAIL", "dw_water 저장 실패555");
connection.release();
return;
}
else {
log(`dw_water DB 저장 성공_sql: aniRFID = "${inp?.RFID}", moveInDt = "${inp?.DATE}", moveInTm = "${inp?.ENTER_TIME}", waterStage = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "SUCCESS", "dw_water 저장 성공666");
connection.release();
return;
}
});
}
else {
log(`dw_water DB에 데이터 중복 발생: aniRFID = "${inp?.RFID}", moveInDt = "${inp?.DATE}", moveInTm = "${inp?.ENTER_TIME}", waterStage = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "DUP", "dw_water 데이터 중복777");
connection.release();
return;
}
}
)
});
}
메세지 받기 (구독한 메세지가 왔을 때, message 이벤트로 리스너를 만들어 처리 가능)
client.on('message', (topic, message, packet) => {
let obj = JSON.parse(message);
log('CMD: ', obj.CMD);
if (obj.CMD === 'COWWATER_ALIVE' || obj.CMD === 'COWWATER_ERROR' || obj.CMD === 'COWWATER_CALCULATE') {
save_dw_water2Handler(topic, obj);
}
if (obj.CMD === 'COWWATER_CALCULATE') {
save_dw_waterHandler(topic, obj);
}
});
MQTT Client를 실행시켜 message를 전송하면 DB에 메세지 값이 저장된다.