001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016
017package org.opengion.fukurou.queue;
018
019import org.opengion.fukurou.util.StringUtil;
020
021import com.amazonaws.auth.AWSCredentials;
022import com.amazonaws.auth.AWSStaticCredentialsProvider;
023import com.amazonaws.auth.BasicAWSCredentials;
024import com.amazonaws.auth.InstanceProfileCredentialsProvider;
025import com.amazonaws.regions.Regions;
026import com.amazonaws.services.sqs.AmazonSQS;
027import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
028import com.amazonaws.services.sqs.model.SendMessageRequest;
029
030/**
031 * SQSサーバへのメッセージキュー送信クラス
032 * 
033 *  SQSサーバへのメッセージキュー送信用のクラスです。
034 * AmazonSQSへの送信が可能です。
035 * 
036 *  @og.group メッセージ連携
037 *  
038 *  @og.rev 5.10.14.0 (2019/08/01) 新規作成
039 *  
040 * @version 5
041 * @author oota
042 * @since JDK7
043 */
044public class QueueSend_SQS implements QueueSend {
045        String sqsUrl = "";
046        Boolean batch = false;
047        AmazonSQS client;
048
049        /**
050         * 接続処理
051         * SQSサーバに接続を行います。
052         * 
053         * @og.rev 5.10.15.0 (2019/08/30) 引数追加対応
054         *
055         * @param jmsServer 接続先url
056         * @param sqsAccessKey sqsアクセスキー
057         * @param sqsSecretKey sqsシークレットキー
058         */
059        @Override
060        public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) {
061                sqsUrl = jmsServer;
062
063                try {
064                                if (StringUtil.isNull(sqsAccessKey)) {
065                                        // IAMロールによる認証
066                                        client = AmazonSQSClientBuilder.standard()
067                                                        .withCredentials(new InstanceProfileCredentialsProvider(false)).build();
068                                } else {
069                                        AWSCredentials credentials = new BasicAWSCredentials(sqsAccessKey, sqsSecretKey);
070                                        
071// proxy環境でのテスト用。proxyホストの情報を入力して、実行します。
072//                                      ClientConfiguration conf = new ClientConfiguration();
073//                                      conf.setProtocol(Protocol.HTTPS);
074//                                      conf.setProxyHost("mtc-px14");
075//                                      conf.setProxyPort(8081);
076                                        
077                                        client = AmazonSQSClientBuilder.standard()
078                                                        .withCredentials(new AWSStaticCredentialsProvider(credentials))
079//                                                      .withClientConfiguration(conf)
080                                                        .withRegion(Regions.AP_NORTHEAST_1.getName()).build();
081                                }
082                } catch (Exception e) {
083                        throwErrMsg("SQSサーバの接続に失敗しました。" + e.getMessage());
084                }
085        }
086
087        /**
088         * エラーメッセージ送信
089         * 
090         * @og.rev 5.10.15.0 (2019/08/30) hybs除外
091         * 
092         * @param errMsg エラーメッセージ
093         */
094        public void throwErrMsg(final String errMsg) {
095                        throw new RuntimeException( errMsg );
096        }
097
098        /**
099         * メッセージ送信
100         * MQサーバにメッセージキューを送信します。
101         * 
102         * @param info 送信メッセージキュー情報 
103         */
104        @Override
105        public void sendMessage(QueueInfo info) {
106                if(client == null) {
107                        throwErrMsg("SQSサーバに接続されていません。");
108                }
109                
110                SendMessageRequest request = new SendMessageRequest(sqsUrl, info.getMessage());
111
112                /** 情報設定 */
113                // FIFOタイプのみ設定します。
114                // グループID
115                request.setMessageGroupId(info.getSqsFifoGroupId());
116                // 重複禁止ID
117                request.setMessageDeduplicationId(info.getSqsFifoDedupliId());
118
119                /** 送信処理 */
120                try {
121                        client.sendMessage(request);
122                } catch (Exception e) {
123                        throwErrMsg("キューの送信処理に失敗しました。" + e.getMessage());
124                }
125        }
126
127        /**
128         * クローズ処理
129         * SQSサーバとの接続をクローズします。
130         */
131        @Override
132        public void close() {
133                if(client != null) {
134                        client.shutdown();
135                }
136        }
137
138        @Override
139        public void setBatchFlg(final Boolean batchFlg) {
140                // SQSの場合は、バッチとWeb上で共通処理のため、フラグ設定の影響はありません
141                batch = batchFlg;
142        }
143
144        /**
145         * テスト用メソッド
146         * proxy環境下で実行する場合は、
147         * connectメソッドのproxy設定のコメントを外して、
148         * 実行する必要があります。
149         * 
150         * @param args 引数
151         */
152        public static void main(String[] args) {
153                // 接続先
154                String url = System.getProperty("URL");
155                // グループID
156                String groupId = System.getProperty("GROUPID");
157                // アクセスキー
158                String accessKey = System.getProperty("CLOUD_SQS_ACCESS_KEY");
159                // シークレットキー
160                String secretKey = System.getProperty("CLOUD_SQS_SECRET_KEY");
161                // 送信メッセージ(乱数は重複排除IDのテスト用)
162                String message = "サンプル送信メッセージ:" + Math.random();
163                
164                // SQSにメッセージ送信
165                QueueSend queueSend = new QueueSend_SQS();
166                
167                // キュー情報の設定
168                QueueInfo queueInfo = new QueueInfo();
169                
170                queueInfo.setSqsFifoGroupId(groupId);
171
172                // メッセージ
173                queueInfo.setMessage(message);
174                
175                // 接続
176                queueSend.connect(url, accessKey, secretKey);
177                // 送信
178                queueSend.sendMessage(queueInfo);
179                // クローズ
180                queueSend.close();
181        }
182}