001package org.opengion.plugin.daemon;
002
003import java.util.Date;
004
005import javax.jms.QueueSession;
006
007import org.hsqldb.lib.StringUtil;
008import org.opengion.fukurou.db.DBUtil;
009import org.opengion.fukurou.queue.QueueInfo;
010import org.opengion.fukurou.queue.QueueSend;
011import org.opengion.fukurou.queue.QueueSendFactory;
012import org.opengion.fukurou.util.ApplicationInfo;
013import org.opengion.fukurou.util.HybsTimerTask;
014import org.opengion.hayabusa.common.HybsSystem;
015
016/**
017 * メッセージキュー送信
018 * メッセージキュー送信テーブルを監視して、
019 * 送信処理を行います。
020 * 
021 * @og.group メッセージ連携
022 * 
023 * @version 5.0
024 * @author oota
025 * @since JDK7
026 *
027 */
028public class Daemon_QueueSend extends HybsTimerTask {
029        private int loopCnt = 0;
030        private static final int LOOP_COUNTER = 24;
031        private QueueSend queueSend;
032
033        private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
034        private static final ApplicationInfo appInfo = null;
035        private static final String DBID = HybsSystem.sys( "RESOURCE_DBID" );
036        private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" );
037        private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" );
038        
039        /**
040         * 開始処理
041         * タイマータスクのデーモン処理の開始ポイントです。
042         */
043        @Override
044        protected void startDaemon() {
045                if (loopCnt % LOOP_COUNTER == 0) {
046                        loopCnt = 1;
047                        System.out.println();
048                        System.out.println(toString() + " " + new Date() + "");
049                } else {
050                        // メッセージキュー送信管理テーブルから、送信対象のレコードを取得
051                        String sql = "select a.system_id, a.ykno, a.queueid, a.message, a.sfdupid, b.quesyu, b.jmsurl from ge66 a"
052                                                + " inner join ge65 b on a.system_id = b.system_id and a.queueid = b.queueid and b.fgj = '1'"
053                                                + " where a.system_id = ? and a.fgkan = '1' and a.fgj = '1'";
054                        String[][] vals = DBUtil.dbExecute(sql, new String[] {SYSTEM_ID}, appInfo, DBID);
055                        
056                        // 取得データ分の繰り返し処理を実行する
057                        for(int i = 0; i  < vals.length; i++) {
058                                String[] record = vals[i];
059                                
060                                String systemId = record[0];
061                                String ykno =  record[1];
062                                String queueId = record[2];
063                                String message = record[3];
064                                String dedupliId = record[4];
065                                String queSyu = record[5];
066                                String jmsUrl = record[6];
067                                
068                                String queueType = queSyu.toUpperCase();
069                                queueSend = QueueSendFactory.newQueueSend(queueType);
070        
071                                // 接続処理
072                                queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
073                                
074                                // メッセージ送信管理テーブルから取得したデータを送信実装予定
075                                QueueInfo queueInfo = new QueueInfo();
076                                
077                                // 応答確認種別
078                                if("MQ".equals(queueType)){
079                                        // MQメッセージサーバ指定時
080                                        queueInfo.setMqTransacted(false);
081                                        queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
082                                        // キュー名
083                                        queueInfo.setMqQueueName(queueId);
084                                }else if("SQS".equals(queueType)){
085                                        // SQSメッセージサーバ指定時
086                                        // グループID
087                                        queueInfo.setSqsFifoGroupId(queueId);
088                                        if(!StringUtil.isEmpty(dedupliId)) {
089                                                // 重複排除ID
090                                                // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる)
091                                                queueInfo.setSqsFifoDedupliId(dedupliId);
092                                        }
093                                }
094                        
095                                // メッセージ
096                                queueInfo.setMessage(message);
097                        
098                                // 完了フラグを送信中:2に更新
099                                updateFgkan(systemId, ykno, "2");
100                                
101                                // メッセージ送信処理
102                                try{
103                                        queueSend.sendMessage(queueInfo);
104                                        
105                                        // 完了フラグを完了:3に更新
106                                        updateFgkan(systemId, ykno, "3");
107                                        
108                                }catch(Exception e) {
109                                        // 完了フラグをエラー:4に更新
110                                        updateError(systemId, ykno, e.getMessage());
111                                } 
112                        }
113                        
114                        // クローズ処理
115                        queueSend.close();
116                        
117                        loopCnt++;
118                }
119        }
120        
121        /**
122         * 完了フラグの更新
123         * 完了フラグを指定された値に更新します。
124         * 
125         * @param systemId システムID
126         * @param ykno 要求番号
127         * @param fgkan 完了フラグ
128         */
129        private void updateFgkan(String systemId, String ykno, String fgkan) {
130                String updSql = "update ge66 set fgkan = ?"
131                                + "where system_id = ? and fgj = '1' and ykno = ?";
132                DBUtil.dbExecute(updSql, new String[] {
133                                fgkan,systemId, ykno}, appInfo, DBID);
134        }
135        
136        /**
137         * エラー状態に更新
138         * 完了フラグをエラー状態に更新して、
139         * エラー情報を格納します。
140         * 
141         * @param systemId システムID
142         * @param ykno 要求番号
143         * @param errMsg エラーメッセージ
144         */
145        private void updateError(String systemId, String ykno, String errMsg) {
146                String updSql = "update ge66 set fgkan = '4', errmsg = ?"
147                                + "where system_id = ? and fgj = '1' and ykno = ?";
148                DBUtil.dbExecute(updSql, new String[] {
149                                errMsg, systemId, ykno
150                }, appInfo, DBID);
151        }
152}