001package org.opengion.plugin.daemon; 002 003import java.util.Date; 004 005import javax.jms.QueueSession; 006 007import org.hsqldb.lib.StringUtil; 008import org.opengion.fukurou.db.DBUtil; 009import org.opengion.fukurou.queue.QueueInfo; 010import org.opengion.fukurou.queue.QueueSend; 011import org.opengion.fukurou.queue.QueueSendFactory; 012import org.opengion.fukurou.util.ApplicationInfo; 013import org.opengion.fukurou.util.HybsTimerTask; 014import org.opengion.hayabusa.common.HybsSystem; 015 016/** 017 * メッセージキュー送信 018 * メッセージキュー送信テーブルを監視して、 019 * 送信処理を行います。 020 * 021 * @og.group メッセージ連携 022 * 023 * @version 5.0 024 * @author oota 025 * @since JDK7 026 * 027 */ 028public class Daemon_QueueSend extends HybsTimerTask { 029 private int loopCnt = 0; 030 private static final int LOOP_COUNTER = 24; 031 private QueueSend queueSend; 032 033 private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 034 private static final ApplicationInfo appInfo = null; 035 private static final String DBID = HybsSystem.sys( "RESOURCE_DBID" ); 036 private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" ); 037 private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" ); 038 039 /** 040 * 開始処理 041 * タイマータスクのデーモン処理の開始ポイントです。 042 */ 043 @Override 044 protected void startDaemon() { 045 if (loopCnt % LOOP_COUNTER == 0) { 046 loopCnt = 1; 047 System.out.println(); 048 System.out.println(toString() + " " + new Date() + ""); 049 } else { 050 // メッセージキュー送信管理テーブルから、送信対象のレコードを取得 051 String sql = "select a.system_id, a.ykno, a.queueid, a.message, a.sfdupid, b.quesyu, b.jmsurl from ge66 a" 052 + " inner join ge65 b on a.system_id = b.system_id and a.queueid = b.queueid and b.fgj = '1'" 053 + " where a.system_id = ? and a.fgkan = '1' and a.fgj = '1'"; 054 String[][] vals = DBUtil.dbExecute(sql, new String[] {SYSTEM_ID}, appInfo, DBID); 055 056 // 取得データ分の繰り返し処理を実行する 057 for(int i = 0; i < vals.length; i++) { 058 String[] record = vals[i]; 059 060 String systemId = record[0]; 061 String ykno = record[1]; 062 String queueId = record[2]; 063 String message = record[3]; 064 String dedupliId = record[4]; 065 String queSyu = record[5]; 066 String jmsUrl = record[6]; 067 068 String queueType = queSyu.toUpperCase(); 069 queueSend = QueueSendFactory.newQueueSend(queueType); 070 071 // 接続処理 072 queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 073 074 // メッセージ送信管理テーブルから取得したデータを送信実装予定 075 QueueInfo queueInfo = new QueueInfo(); 076 077 // 応答確認種別 078 if("MQ".equals(queueType)){ 079 // MQメッセージサーバ指定時 080 queueInfo.setMqTransacted(false); 081 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 082 // キュー名 083 queueInfo.setMqQueueName(queueId); 084 }else if("SQS".equals(queueType)){ 085 // SQSメッセージサーバ指定時 086 // グループID 087 queueInfo.setSqsFifoGroupId(queueId); 088 if(!StringUtil.isEmpty(dedupliId)) { 089 // 重複排除ID 090 // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる) 091 queueInfo.setSqsFifoDedupliId(dedupliId); 092 } 093 } 094 095 // メッセージ 096 queueInfo.setMessage(message); 097 098 // 完了フラグを送信中:2に更新 099 updateFgkan(systemId, ykno, "2"); 100 101 // メッセージ送信処理 102 try{ 103 queueSend.sendMessage(queueInfo); 104 105 // 完了フラグを完了:3に更新 106 updateFgkan(systemId, ykno, "3"); 107 108 }catch(Exception e) { 109 // 完了フラグをエラー:4に更新 110 updateError(systemId, ykno, e.getMessage()); 111 } 112 } 113 114 // クローズ処理 115 queueSend.close(); 116 117 loopCnt++; 118 } 119 } 120 121 /** 122 * 完了フラグの更新 123 * 完了フラグを指定された値に更新します。 124 * 125 * @param systemId システムID 126 * @param ykno 要求番号 127 * @param fgkan 完了フラグ 128 */ 129 private void updateFgkan(String systemId, String ykno, String fgkan) { 130 String updSql = "update ge66 set fgkan = ?" 131 + "where system_id = ? and fgj = '1' and ykno = ?"; 132 DBUtil.dbExecute(updSql, new String[] { 133 fgkan,systemId, ykno}, appInfo, DBID); 134 } 135 136 /** 137 * エラー状態に更新 138 * 完了フラグをエラー状態に更新して、 139 * エラー情報を格納します。 140 * 141 * @param systemId システムID 142 * @param ykno 要求番号 143 * @param errMsg エラーメッセージ 144 */ 145 private void updateError(String systemId, String ykno, String errMsg) { 146 String updSql = "update ge66 set fgkan = '4', errmsg = ?" 147 + "where system_id = ? and fgj = '1' and ykno = ?"; 148 DBUtil.dbExecute(updSql, new String[] { 149 errMsg, systemId, ykno 150 }, appInfo, DBID); 151 } 152}