001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016
017package org.opengion.fukurou.queue;
018
019import javax.jms.Connection;
020import javax.jms.JMSException;
021import javax.jms.MessageProducer;
022import javax.jms.Queue;
023import javax.jms.QueueConnectionFactory;
024import javax.jms.QueueSession;
025import javax.jms.Session;
026import javax.jms.TextMessage;
027import javax.naming.Context;
028import javax.naming.InitialContext;
029import javax.naming.NamingException;
030
031import org.apache.activemq.ActiveMQConnection;
032import org.apache.activemq.ActiveMQConnectionFactory;
033
034/**
035 * MQサーバへのメッセージキュー送信用クラス
036 * 
037 * MQサーバへのメッセージキュー送信用のクラスです。
038 * Apache ActiveMQとAmazonMQへの送信が可能です。
039 * tomcatからの送信(JNDI利用)と、
040 * バッチ処理(urlを指定し接続)の2通りが可能です。
041 * 
042 * ※Apache ActiveMQとAmazonMQの切り替えは、
043 * jmsServerの接続先URLを変更するのみで接続の変更が可能です。
044 * (proxy環境からAmazonMqへの接続は行えない場合があります)
045 * 
046 * @og.group メッセージ連携
047 * 
048 * @og.rev 5.10.14.0 (2019/08/01) 新規作成
049 * 
050 * @version 5
051 * @author oota
052 * @since JDK7
053 *
054 */
055public class QueueSend_MQ implements QueueSend {
056        private Connection connection = null;
057        private  Session session = null;
058        private  MessageProducer sender = null;
059        private Context ctx = null;
060        // バッチ用フィールド
061        private  boolean batch = false;
062        private String mqUserId = "";
063        private String mqPassword = "";
064
065        /**
066         * 接続処理
067         * MQサーバに接続を行います。
068         * 
069         * @param jmsServer jmsサーバ接続名(バッチの場合はurl)
070         */
071        public void connect(final String jmsServer) {
072                try {
073                        ctx = new InitialContext();
074                        // 1. Connectionの作成s
075                        QueueConnectionFactory factory = null;
076                        if (batch) {
077                                // バッチ処理の場合。URL指定で、ユーザIDとパスワードを指定して接続。
078                                mqUserId = System.getProperty("mqUserId");
079                                mqPassword = System.getProperty("mqPassword");
080                                factory = new ActiveMQConnectionFactory(jmsServer);
081                                connection = (ActiveMQConnection)factory.createConnection(mqUserId, mqPassword);
082                        } else {
083                                // tomcat接続の場合。JNDIを利用して接続。
084                                factory = (QueueConnectionFactory) ctx.lookup("java:comp/env/" + jmsServer);
085                                connection = (ActiveMQConnection)factory.createConnection();
086                        }
087
088                        // 2. Connectioの開始
089                        connection.start();
090
091                } catch (JMSException jmse) {
092                        throwErrMsg("MQサーバーの接続に失敗しました。" + jmse.getMessage());
093                } catch (NamingException ne) {
094                        throwErrMsg("名前解決に失敗しました。" + ne.getMessage());
095                }
096        }
097
098        /**
099         * 接続処理
100         * MQサーバに接続します。
101         * connect(String jmsServer)と同じ処理になります。
102         *
103         * @og.rev 5.10.15.0 (2019/08/30) 引数追加対応
104         * 
105         * @param jmsServer jmsサーバ情報
106         * @param sqsAccessKey アクセスキー(MQサーバでは未使用)
107         * @param sqsSecretKey シークレットキー(MQサーバでは未使用)
108         */
109        @Override
110        public void connect(String jmsServer, String sqsAccessKey, String sqsSecretKey) {
111                // MQではsqsAccessKeyとsqsSecretKeyは利用しません。
112                connect(jmsServer);
113        }
114
115
116        /**
117         * エラーメッセージ送信
118         * 
119         * @og.rev 5.10.15.0 (2019/08/30) Hybs除外
120         * 
121         * @param errMsg エラーメッセージ
122         */
123        public void throwErrMsg(final String errMsg) {
124                        throw new RuntimeException( errMsg );
125        }
126
127        /**
128         * メッセージ送信
129         * MQサーバにメッセージを送信します。
130         * 
131         * @param queueInfo 送信キュー情報
132         */
133        @Override
134        public void sendMessage(final QueueInfo queueInfo) {
135                try {
136                        // 初期チェック
137                        if (connection == null) {
138                                throwErrMsg("MQサーバに接続されていません。");
139                        }
140
141                        // 1. QueueSessionの作成
142                        session = connection.createSession(queueInfo.isMqTransacted(), queueInfo.getMqAcknowledgeMode());
143                        if (session == null) {
144                                throwErrMsg("キューセッションの生成に失敗しました。");
145                        }
146
147                        // 2. Queueの作成
148                        Queue queue = null;
149                        queue = session.createQueue(queueInfo.getMqQueueName());
150                        sender = session.createProducer(queue);
151
152                        // 3. テキストメッセージの作成
153                        TextMessage msg = session.createTextMessage(queueInfo.getMessage());
154
155                        // 4. 送信処理
156                        sender.send(msg);
157
158                } catch (JMSException e) {
159                        throwErrMsg("キューの送信処理に失敗しました。" + e.getMessage());
160                }
161        }
162
163        /**
164         * クローズ処理
165         * MQサーバとの接続をクローズします。
166         */
167        @Override
168        public void close() {
169                if (ctx != null) {
170                        try {
171                                ctx.close();
172                        } catch (Exception e) {
173                                System.out.println("ctxのクローズに失敗しました。");
174                        }
175                }
176                // 1. sender,session,connectionのクローズ処理
177                if (sender != null) {
178                        try {
179                                sender.close();
180                        } catch (Exception e) {
181                                System.out.println("senderのクローズに失敗しました。");
182                        }
183                }
184                if (session != null) {
185                        try {
186                                session.close();
187                        } catch (Exception e) {
188                                System.out.println("sessionのクローズに失敗しました。");
189                        }
190                }
191                if (connection != null) {
192                        try {
193                                connection.close();
194                        } catch (Exception e) {
195                                System.out.println("connectionのクローズに失敗しました。");
196                        }
197                }
198        }
199
200        /**
201         * バッチ処理判定フラグを設定します。
202         * バッチ処理の場合は引数で接続先情報を与えます。
203         * それ以外の場合(Tomcat)ではJNDIより情報を取得します。
204         * 
205         * @param batchFlg バッチ処理判定フラグ
206         */
207        @Override
208        public void setBatchFlg(final Boolean batchFlg) {
209                batch = batchFlg;
210        }
211
212        /**
213         * テスト用メソッド
214         * テスト実行用です。
215         * 
216         * @param args 引数
217         */
218        public static void main(String[] args) {
219                System.out.println("main start");
220                // 送信情報の設定
221                String url = "tcp://localhost:61616";
222                String queueName = "test01";
223                String msg = "送信メッセージ";
224                
225                QueueInfo queueInfo = new QueueInfo();
226                queueInfo.setMqQueueName(queueName);
227                queueInfo.setMqTransacted(false);
228                queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
229                queueInfo.setMessage(msg);
230
231                QueueSend queueSend = new QueueSend_MQ();
232                queueSend.setBatchFlg(true);
233
234                try {
235                        queueSend.connect(url,null,null);
236                        queueSend.sendMessage(queueInfo);
237                } catch (Exception e) {
238                        System.out.println(e.getMessage());
239                } finally {
240                        queueSend.close();
241                }
242
243                System.out.println("main end");
244        }
245}