Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- setInterval clear
- mosquitto.conf
- mosquitto
- setInterval 외부 정지
- pm2 설치
- pm2 시작
- 서버동기화
- map이 undefined가 뜰 때
- transfer
- 맥 어드레스
- pm2 상태 확인
- 공인IP
- datagridview 직접입력
- DatePicker
- c# datagridview 데이터 넣기
- 1883
- DataGridView 직접 입력
- pm2 확인
- setInterval 정지
- allow_anonymouse
- timepicker
- 데이터테이블 데이터 넣기
- listener 1883
- Replication
- mySQL_Replication
- AntDesign
- invalid data
- pm2
- setInterval 중지
- html #select #option #multiple
Archives
- Today
- Total
개발 노트
MQTT 메세지 받아서 DB에 저장 본문
DB connection pool
let dbPool = null;
async function createPool() {
try {
dbPool = mysql.createPool({
host: '127.0.0.1',
user: 'root',
password: 'ekdnsel',
database: 'dawoon',
waitForConnections: true, // 연결이 사용 가능할 때까지 대기
connectionLimit: 1, // 연결 풀의 최대 연결 수
queueLimit: 0, // 대기열에 들어갈 연결 요청의 최대 수 (0은 무제한)
reconnect: {
autoReconnect: true, // 연결이 끊어졌을 때 자동 재연결 설정
maxReconnects: 3, // 최대 재시도 횟수 및 재시도 간격(ms)
reconnectInterval: 2000,
},
console.log('pool 생성');
} catch (error) {
console.error('pool 에러: ', error);
setTimeout(createPool, 5000);
}
}
createPool();
MQTT 연결
const options = {
host: '127.0.0.1',
port: 1883,
protocol: 'mqtt',
};
const client = mqtt.connect(options);
MQTT 연결확인
client.on("connect", () => {
log("MQTT 연결성공_connected: " + client.connected);
})
client.on("error", (error) => {
log("MQTT 연결실패_Can't connect" + error);
process.exit(1)
});
Topic에 subscribe
const topic = "#";
client.subscribe(topic);
dw_water2 테이블에 mqtt에서 보낸 데이터 저장 (CMD가 ALIVE, ERROR, CALCULATE일 때)
const save_dw_water2Handler = (topic, inp) => {
log('dw_water2에 저장');
let START_WATERTMPR = inp?.START_WATERTMPR;
let END_WATERTMPR = inp?.END_WATERTMPR;
if (inp.CMD === 'COWWATER_ERROR') {
START_WATERTMPR = inp?.WATERTMPR
END_WATERTMPR = 0;
}
// dw_water2 테이블에 넣을 데이터 중복인지 체크
let sqlSelct = `
select COUNT(*) CNT from dw_water2
where aniRFID = "${inp?.RFID}" AND CMD = "${inp?.CMD}" AND NOWTIME = "${inp?.NOWTIME}" AND CODE = "${inp?.CODE}" AND id = "${inp?.ID}"
`
log(sqlSelct);
dbPool.getConnection((err, connection) => {
if (err) {
console.error('연결 획득 중 오류: ', err);
return;
}
connection.query(sqlSelct,
(error, results, fields) => {
if (error) {
log(`dw_water2 DB 조회 실패_sqlSelct: ${error} aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`);
let para = { ...inp }
console.log('1111: ', inp)
para.DATE = inp.NOWTIME
message(topic, para, "FAIL", "dw_water2 조회 실패111");
connection.release();
return;
}
log('Query results:', results[0].CNT);
if (Number(results[0].CNT) <= 0) {
let sqlInsert = `INSERT INTO dw_water2
(aniRFID, CMD ,NOWTIME ,CODE ,ID ,IDX
,STABLE_NOW ,STATUS ,DATE ,ENTER_TIME ,VISIT_TIME
,WATER_CNT1 ,WATER_CNT2 ,START_WEIGHT ,END_WEIGHT ,START_WATERTMPR ,END_WATERTMPR
,ERROR_CODE ,ERROR_TIME)
VALUES("${inp?.RFID}","${inp?.CMD}", "${inp?.NOWTIME}", "${inp?.CODE}"
, "${inp?.ID}", "${inp?.INDEX}", "${inp?.STABLE_NOW}", "${inp?.STATUS}"
, "${inp?.DATE}", "${inp?.ENTER_TIME}", "${inp?.VISIT_TIME}", ${inp?.WATER_CNT1}, ${inp?.WATER_CNT2}
, ${inp?.START_WEIGHT}, ${inp?.END_WEIGHT}, ${START_WATERTMPR}, ${END_WATERTMPR}
, "${inp?.ERROR_CODE}", "${inp?.ERROR_TIME}") `
connection.query(sqlInsert,
(error, results, fields) => {
if (error) {
// log('DB 저장 실패', error)
log(`dw_water2 DB 저장 실패_sqlInsert: ${error} aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`)
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "FAIL", "dw_water2 저장 실패222");
connection.release();
return;
}
log(`dw_water2 DB 저장성공_sqlInsert: aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "SUCCESS", "dw_water2 저장 성공333");
connection.release();
return;
});
}
else {
log(`dw_water2 DB에 데이터 중복 발생: aniRFID = "${inp?.RFID}", CMD = "${inp?.CMD}", NOWTIME = "${inp?.NOWTIME}", CODE = "${inp?.CODE}", id = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "DUP", "dw_water2 데이터 중복444");
connection.release();
return;
}
});
})
}
dw_water 테이블에 mqtt에서 보낸 데이터 저장 (CMD가 CALCULATE일 때)
const save_dw_waterHandler = (topic, inp) => {
log('dw_water에 저장');
// dw_water 테이블에 넣을 데이터 중복인지 체크
let sqlSelct_dw_water = `
select COUNT(*) CNT from dw_water
where aniRFID = "${inp?.RFID}" AND moveInDt = "${inp?.DATE}" AND moveInTm = "${inp?.ENTER_TIME}" AND waterStage = "${inp?.ID}"
`
log(sqlSelct_dw_water);
dbPool.getConnection((err, connection) => {
if (err) {
console.error('dw_water 연결 획득 중 오류: ', err);
return;
}
connection.query(sqlSelct_dw_water,
(error, results, fields) => {
console.log(88888888)
console.log('result[0].CNT: ', results[0].CNT);
if (Number(results[0].CNT) <= 0) {
let sql = `
CALL sp_water(
0, "${inp?.DATE}", "${inp?.ENTER_TIME}", "${inp?.VISIT_TIME}", ${inp?.START_WEIGHT}
, ${inp?.END_WEIGHT}, ${inp?.START_WEIGHT}-${inp?.END_WEIGHT}, ${inp?.WATER_CNT1}, ${inp?.WATER_CNT2}
, "${inp?.ID}", "${inp.START_WATERTMPR}", "${inp?.END_WATERTMPR}", "Y", null, "${inp?.RFID}")
`
console.log('sql: ', sql);
connection.query(sql,
(error, results, fields) => {
if (error) {
log(`dw_water DB 저장 실패_sql: ${error} aniRFID = "${inp?.RFID}", moveInDt = "${inp?.DATE}", moveInTm = "${inp?.ENTER_TIME}", waterStage = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "FAIL", "dw_water 저장 실패555");
connection.release();
return;
}
else {
log(`dw_water DB 저장 성공_sql: aniRFID = "${inp?.RFID}", moveInDt = "${inp?.DATE}", moveInTm = "${inp?.ENTER_TIME}", waterStage = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "SUCCESS", "dw_water 저장 성공666");
connection.release();
return;
}
});
}
else {
log(`dw_water DB에 데이터 중복 발생: aniRFID = "${inp?.RFID}", moveInDt = "${inp?.DATE}", moveInTm = "${inp?.ENTER_TIME}", waterStage = "${inp?.ID}"`);
let para = { ...inp }
para.DATE = inp.NOWTIME
message(topic, para, "DUP", "dw_water 데이터 중복777");
connection.release();
return;
}
}
)
});
}
메세지 받기 (구독한 메세지가 왔을 때, message 이벤트로 리스너를 만들어 처리 가능)
client.on('message', (topic, message, packet) => {
let obj = JSON.parse(message);
log('CMD: ', obj.CMD);
if (obj.CMD === 'COWWATER_ALIVE' || obj.CMD === 'COWWATER_ERROR' || obj.CMD === 'COWWATER_CALCULATE') {
save_dw_water2Handler(topic, obj);
}
if (obj.CMD === 'COWWATER_CALCULATE') {
save_dw_waterHandler(topic, obj);
}
});
MQTT Client를 실행시켜 message를 전송하면 DB에 메세지 값이 저장된다.
'Node' 카테고리의 다른 글
Node js 시작하기 (0) | 2023.11.08 |
---|---|
MQTT 메세지 받아서 DB에 저장 - 프로그램 시작 시 DB가 연결되지 않은 경우 (0) | 2023.09.25 |
mySQL에서 connection pool 사용 (0) | 2023.09.21 |
nodejs - winston 모듈 (0) | 2023.09.20 |
winston 모듈 - 서버 로그 관리 (0) | 2023.09.19 |