1、完善hive安装教程

2、格式化项目和代码符合阿里插件规范
pull/26/head
3y 2 years ago
parent 79ac11fdb3
commit 15d13dc84f

@ -190,7 +190,6 @@ docker exec -it kafka sh
创建两个topic(这里我的**topicName**就叫austinBusiness、austinTraceLog、austinRecall你们可以改成自己的)
```
$KAFKA_HOME/bin/kafka-topics.sh --create --topic austinBusiness --partitions 1 --zookeeper zookeeper:2181 --replication-factor 1
$KAFKA_HOME/bin/kafka-topics.sh --create --topic austinTraceLog --partitions 1 --zookeeper zookeeper:2181 --replication-factor 1
@ -558,8 +557,343 @@ services:
- TZ=Asia/Shanghai
```
## 10、HIVE
部署Flink也是直接上docker-compose就完事了
1、把仓库拉到自己的服务器上
```shell
git clone git@github.com:big-data-europe/docker-hive.git
```
2、进入到项目的文件夹里
```shell
cd docker-hive
```
3、微调下docker-compose文件内容如下主要是增加了几个通信的端口
```yml
version: "3"
services:
namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
volumes:
- namenode:/hadoop/dfs/name
environment:
- CLUSTER_NAME=test
env_file:
- ./hadoop-hive.env
ports:
- "50070:50070"
- "9000:9000"
- "8020:8020"
datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
volumes:
- datanode:/hadoop/dfs/data
env_file:
- ./hadoop-hive.env
environment:
SERVICE_PRECONDITION: "namenode:50070"
ports:
- "50075:50075"
- "50010:50010"
- "50020:50020"
hive-server:
image: bde2020/hive:2.3.2-postgresql-metastore
env_file:
- ./hadoop-hive.env
environment:
HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
SERVICE_PRECONDITION: "hive-metastore:9083"
ports:
- "10000:10000"
hive-metastore:
image: bde2020/hive:2.3.2-postgresql-metastore
env_file:
- ./hadoop-hive.env
command: /opt/hive/bin/hive --service metastore
environment:
SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432"
ports:
- "9083:9083"
hive-metastore-postgresql:
image: bde2020/hive-metastore-postgresql:2.3.0
ports:
- "5432:5432"
presto-coordinator:
image: shawnzhu/prestodb:0.181
ports:
- "8080:8080"
volumes:
namenode:
datanode:
```
4、最后我们可以连上`hive`的客户端,感受下快速安装好`hive`的成功感。
```shell
# 进入bash
docker-compose exec hive-server bash
# 使用beeline客户端连接
/opt/hive/bin/beeline -u jdbc:hive2://localhost:10000
```
## 11、FLINK和HIVE融合
实时流处理的flink用的是docker-compose进行部署而与hive融合的flink我这边是正常的姿势安装主要是涉及的环境很多用docker-compose就相对没那么方便了
### 11.1 安装flink环境
1、下载`flink`压缩包
```shell
wget https://dlcdn.apache.org/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz
```
2、解压`flink`
```shell
tar -zxf flink-1.16.0-bin-scala_2.12.tgz
```
3、修改该目录下的`conf`下的`flink-conf.yaml`文件中`rest.bind-address`配置,不然**远程访问不到**`8081`端口,将其改为`0.0.0.0`
```shell
rest.bind-address: 0.0.0.0
```
4、将`flink`官网提到连接`hive`所需要的`jar`包下载到`flink`的`lib`目录下(一共4个)
```shell
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/1.16.0/flink-sql-connector-hive-2.3.9_2.12-1.16.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/hive/hive-exec/2.3.4/hive-exec-2.3.4.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.16.0/flink-connector-hive_2.12-1.16.0.jar
wget https://repo.maven.apache.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
```
5、按照官网指示把`flink-table-planner_2.12-1.16.0.jar`和`flink-table-planner-loader-1.16.0.jar` 这俩个`jar`包移动其目录;
```shell
mv $FLINK_HOME/opt/flink-table-planner_2.12-1.16.0.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.16.0.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.16.0.jar $FLINK_HOME/opt/flink-table-planner-loader-1.16.0.jar
```
6、把后续`kafka`所需要的依赖也下载到`lib`目录下
```shell
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.16.0/flink-connector-kafka-1.16.0.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.1/kafka-clients-3.3.1.jar
```
7、把工程下的`hive-site.xml`文件拷贝到`$FLINK_HOME/conf`下,内容如下(**hive_ip**自己变动)
```xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:postgresql://hive_ip:5432/metastore?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.postgresql.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hive_ip:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
</description>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
</configuration>
```
### 11.2 安装hadoop环境
由于`hive`的镜像已经锁死了`hadoop`的版本为`2.7.4`,所以我这边`flink`所以来的`hadoop`也是下载`2.7.4`版本
1、下载`hadoop`压缩包
```shell
wget https://archive.apache.org/dist/hadoop/common/hadoop-2.7.4/hadoop-2.7.4.tar.gz
```
2、解压`hadoop`
```shell
tar -zxf hadoop-2.7.4.tar.gz
```
3、`hadoop`的配置文件`hdfs-site.xml`增加以下内容(我的目录在`/root/hadoop-2.7.4/etc/hadoop`
```xml
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
<description>only cofig in clients</description>
</property>
```
### 11.3 安装jdk11
由于高版本的`flink`需要`jdk 11`,所以这边安装下该版本的`jdk`
```shell
yum install java-11-openjdk.x86_64
yum install java-11-openjdk-devel.x86_64
```
### 11.4 配置jdk、hadoop的环境变量
这一步为了能让`flink`在启动的时候,加载到`jdk`和`hadoop`的环境。
1、编辑`/etc/profile`文件
```shell
vim /etc/profile
```
2、文件内容最底下增加以下配置
```shell
JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.17.0.8-2.el7_9.x86_64
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH
export HADOOP_HOME=/root/hadoop-2.7.4
export PATH=$HADOOP_HOME/bin:$PATH
export HADOOP_CLASSPATH=`hadoop classpath`
```
3、让配置文件生效
```shell
source /etc/profile
```
### 11.5 增加hosts进行通信flink和namenode/datanode之间
在部署`flink`服务器上增加`hosts`,有以下(`ip`为部署`hive`的地址):
```shell
127.0.0.1 namenode
127.0.0.1 datanode
127.0.0.1 b2a0f0310722
```
其中 `b2a0f0310722`是`datanode`的主机名,该主机名会随着`hive`的`docker`而变更,我们可以登录`namenode`的后台地址找到其主机名。而方法则是在部署`hive`的地址输入:
```
http://localhost:50070/dfshealth.html#tab-datanode
```
![](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/384425d102464c059d462add377b4582~tplv-k3u1fbpfcp-watermark.image?)
### 11.6 启动flink调试kafka数据到hive
启动`flink-sql`的客户端:
```shell
./sql-client.sh
```
在`sql`客户端下执行以下脚本命令,注:`hive-conf-dir`要放在`$FLINK_HOME/conf`下
```shell
CREATE CATALOG my_hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/root/flink-1.16.0/conf'
);
```
```shell
use catalog my_hive;
```
```shell
create database austin;
```
重启`flink`集群
```shell
./stop-cluster.sh
```
```shell
./start-cluster.sh
```
重新提交执行`flink`任务
```shell
./flink run austin-data-house-0.0.1-SNAPSHOT.jar
```
启动消费者的命令(将`ip`和`port`改为自己服务器所部署的Kafka信息
```shell
$KAFKA_HOME/bin/kafka-console-producer.sh --topic austinTraceLog --broker-list ip:port
```
输入测试数据:
```json
{"state":"1","businessId":"2","ids":[1,2,3],"logTimestamp":"123123"}
```
## 12、安装METABASE
部署`Metabase`很简单,也是使用`docker`进行安装部署,就两行命令(后续我会将其加入到`docker-compose`里面)。
```shell
docker pull metabase/metabase:latest
```
```shell
docker run -d -p 5001:3000 --name metabase metabase/metabase
```
完了之后,我们就可以打开`5001`端口到`Metabase`的后台了。
## 10、未完待续
## 13、未完待续
安装更详细的过程以及整个文章系列的更新思路都在公众号**Java3y**连载哟!

@ -181,8 +181,9 @@ austin项目**强依赖**`MySQL`/`Redis`/(**大概需要2G内存**)**弱依
- [x] 测试环境完成微信服务号扫码登录功能
- [x] 测试环境docker-compose完成接入MySQL/Redis/Flink/xxl-job/Kafka
- [x] 在线演示第一版发布
- [x] 数据仓库模块完成
- [ ] 总体架构已完成,持续做基础建设和优化代码
**近期更新时间**12月29号
**近期更新时间**2023年1月29号
**近期更新功能**完善企业微信(应用消息)和企业微信(机器人)所支持的消息类型,以及素材的上传
**近期更新功能**数据仓库模块`austin-data-house`调试成功

@ -8,11 +8,6 @@ package com.java3y.austin.common.constant;
*/
public class AustinConstant {
/**
*
*/
public static final String ORIGIN_VALUE = "http://localhost:3000";
/**
* businessId
* com.java3y.austin.support.utils.TaskInfoUtils#generateBusinessId(java.lang.Long, java.lang.Integer)
@ -42,6 +37,4 @@ public class AustinConstant {
public static final String DEFAULT_AUDITOR = "Java3y";
}

@ -80,7 +80,4 @@ public class CommonConstant {
public final static String ENV_TEST = "test";
}

@ -44,5 +44,4 @@ public class OfficialAccountParamConstant {
public static final String UNSUBSCRIBE_TIPS = "老乡别走!";
}

@ -3,6 +3,7 @@ package com.java3y.austin.common.constant;
/**
*
*
* @author 3y
*/
public class SendAccountConstant {

@ -9,6 +9,7 @@ import java.util.Set;
/**
*
*
* @author 3y
*/
@Data

@ -6,8 +6,8 @@ import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
*
*
* @author 3y
*/
@Data

@ -6,7 +6,6 @@ import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
*
* <p>
*

@ -6,7 +6,6 @@ import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
*
* <p>
*

@ -7,7 +7,7 @@ import lombok.NoArgsConstructor;
/**
*
*
* <p>
*
* [{"sms_20":{"url":"https://sms.yunpian.com/v2/sms/tpl_batch_send.json","apikey":"ca55d4c8544444444444622221b5cd7","tpl_id":"533332222282","supplierId":20,"supplierName":"云片"}}]
*

@ -15,7 +15,7 @@ import java.util.Map;
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AlipayMiniProgramContentModel extends ContentModel{
public class AlipayMiniProgramContentModel extends ContentModel {
/**
*

@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
/**
* @author 3y
*
* <p>
*
*/
@Data

@ -9,7 +9,7 @@ import lombok.NoArgsConstructor;
* @author 3y
* <p>
*
*
* <p>
* urlcontent
*/
@Data

@ -15,16 +15,47 @@ import lombok.ToString;
@AllArgsConstructor
public enum AnchorState {
/**
*
*/
RECEIVE(10, "消息接收成功"),
/**
* Kafka
*/
DISCARD(20, "消费被丢弃"),
/**
*
*/
NIGHT_SHIELD(22, "夜间屏蔽"),
/**
* 9
*/
NIGHT_SHIELD_NEXT_SEND(24, "夜间屏蔽(次日早上9点发送)"),
/**
* 5min
*/
CONTENT_DEDUPLICATION(30, "消息被内容去重"),
/**
*
*/
RULE_DEDUPLICATION(40, "消息被频次去重"),
/**
*
*/
WHITE_LIST(50, "白名单过滤"),
/**
*
*/
SEND_SUCCESS(60, "消息下发成功"),
/**
*
*/
SEND_FAIL(70, "消息下发失败"),
/**
*
*/
CLICK(0100, "消息被点击"),
;

@ -15,10 +15,16 @@ import lombok.ToString;
public enum AuditStatus {
/**
* 10. 20. 30.'
* 10.
*/
WAIT_AUDIT(10, "待审核"),
/**
* 20.
*/
AUDIT_SUCCESS(20, "审核成功"),
/**
* 30.'
*/
AUDIT_REJECT(30, "被拒绝");
private final Integer code;

@ -17,18 +17,54 @@ import lombok.ToString;
public enum ChannelType {
/**
* IM() --
*/
IM(10, "IM(站内信)", ImContentModel.class, "im"),
/**
* push() --
*/
PUSH(20, "push(通知栏)", PushContentModel.class, "push"),
/**
* sms() --
*/
SMS(30, "sms(短信)", SmsContentModel.class, "sms"),
/**
* email() -- QQ163
*/
EMAIL(40, "email(邮件)", EmailContentModel.class, "email"),
/**
* officialAccounts() --
*/
OFFICIAL_ACCOUNT(50, "officialAccounts(服务号)", OfficialAccountsContentModel.class, "official_accounts"),
/**
* miniProgram()
*/
MINI_PROGRAM(60, "miniProgram(小程序)", MiniProgramContentModel.class, "mini_program"),
/**
* enterpriseWeChat()
*/
ENTERPRISE_WE_CHAT(70, "enterpriseWeChat(企业微信)", EnterpriseWeChatContentModel.class, "enterprise_we_chat"),
/**
* dingDingRobot()
*/
DING_DING_ROBOT(80, "dingDingRobot(钉钉机器人)", DingDingRobotContentModel.class, "ding_ding_robot"),
/**
* dingDingWorkNotice()
*/
DING_DING_WORK_NOTICE(90, "dingDingWorkNotice(钉钉工作通知)", DingDingWorkContentModel.class, "ding_ding_work_notice"),
/**
* enterpriseWeChat()
*/
ENTERPRISE_WE_CHAT_ROBOT(100, "enterpriseWeChat(企业微信机器人)", EnterpriseWeChatRobotContentModel.class, "enterprise_we_chat_robot"),
/**
* feiShuRoot()
*/
FEI_SHU_ROBOT(110, "feiShuRoot(飞书机器人)", FeiShuRobotContentModel.class, "fei_shu_robot"),
ALIPAY_MINI_PROGRAM(120,"alipayMiniProgram(支付宝小程序)",AlipayMiniProgramContentModel.class,"alipay_mini_program"),
/**
* alipayMiniProgram()
*/
ALIPAY_MINI_PROGRAM(120, "alipayMiniProgram(支付宝小程序)", AlipayMiniProgramContentModel.class, "alipay_mini_program"),
;
/**
@ -53,6 +89,7 @@ public enum ChannelType {
/**
* codeclass
*
* @param code
* @return
*/
@ -68,6 +105,7 @@ public enum ChannelType {
/**
* codeenum
*
* @param code
* @return
*/

@ -9,6 +9,7 @@ import java.util.List;
/**
*
*
* @author 3y
*/
@Getter
@ -16,7 +17,14 @@ import java.util.List;
@AllArgsConstructor
public enum DeduplicationType {
/**
*
*/
CONTENT(10, "N分钟相同内容去重"),
/**
*
*/
FREQUENCY(20, "一天内N次相同渠道去重"),
;
private final Integer code;
@ -25,6 +33,7 @@ public enum DeduplicationType {
/**
*
*
* @return
*/
public static List<Integer> getDeduplicationList() {

@ -14,9 +14,21 @@ import lombok.ToString;
@ToString
@AllArgsConstructor
public enum FileType {
/**
*
*/
IMAGE("10", "image"),
/**
*
*/
VOICE("20", "voice"),
/**
*
*/
COMMON_FILE("30", "file"),
/**
*
*/
VIDEO("40", "video"),
;
private final String code;

@ -14,14 +14,41 @@ import lombok.ToString;
@ToString
@AllArgsConstructor
public enum IdType {
/**
* userId
*/
USER_ID(10, "userId"),
/**
*
*/
DID(20, "did"),
/**
*
*/
PHONE(30, "phone"),
/**
* openId
*/
OPEN_ID(40, "openId"),
/**
*
*/
EMAIL(50, "email"),
/**
* userId
*/
ENTERPRISE_USER_ID(60, "enterprise_user_id"),
/**
* userId
*/
DING_DING_USER_ID(70, "ding_ding_user_id"),
/**
* cid
*/
CID(80, "cid"),
/**
* userId
*/
FEI_SHU_USER_ID(90, "fei_shu_user_id"),
;

@ -14,14 +14,32 @@ import lombok.ToString;
public enum MessageStatus {
/**
* 10. 20. 30. 40. 50. 60. 70.
* 10.
*/
INIT(10, "初始化状态"),
/**
* 20.
*/
STOP(20, "停用"),
/**
* 30.
*/
RUN(30, "启用"),
/**
* 40.
*/
PENDING(40, "等待发送"),
/**
* 50.
*/
SENDING(50, "发送中"),
/**
* 60.
*/
SEND_SUCCESS(60, "发送成功"),
/**
* 70.
*/
SEND_FAIL(70, "发送失败");
private final Integer code;

@ -6,6 +6,7 @@ import lombok.ToString;
/**
*
*
* @author 3y
*/
@Getter
@ -13,10 +14,18 @@ import lombok.ToString;
@AllArgsConstructor
public enum MessageType {
NOTICE(10,"通知类消息","notice"),
MARKETING(20,"营销类消息","marketing"),
AUTH_CODE(30,"验证码消息","auth_code")
;
/**
*
*/
NOTICE(10, "通知类消息", "notice"),
/**
*
*/
MARKETING(20, "营销类消息", "marketing"),
/**
*
*/
AUTH_CODE(30, "验证码消息", "auth_code");
/**
*
@ -37,6 +46,7 @@ public enum MessageType {
/**
* codeenum
*
* @param code
* @return
*/

@ -14,24 +14,70 @@ import lombok.ToString;
@AllArgsConstructor
public enum SendMessageType {
TEXT("10", "文本", "text", "text","text","text"),
VOICE("20", "语音", null, "voice",null,null),
VIDEO("30", "视频", null, null,null,null),
NEWS("40", "图文", "feedCard", null,"news",null),
TEXT_CARD("50", "文本卡片", null, null,null,null),
FILE("60", "文件", null, "file","file",null),
MINI_PROGRAM_NOTICE("70", "小程序通知", null, null,null,null),
MARKDOWN("80", "markdown", "markdown", "markdown","markdown",null),
TEMPLATE_CARD("90", "模板卡片", null, null,"template_card",null),
IMAGE("100", "图片", null, "image","image","image"),
LINK("110", "链接消息", "link", "link",null,null),
ACTION_CARD("120", "跳转卡片消息", "actionCard", "action_card",null,"interactive"),
OA("130", "OA消息", null, "oa",null,null),
MP_NEWS("140", "图文消息(mpNews)", null, null,null,null),
RICH_TEXT("150", "富文本", null, null,null,"post"),
SHARE_CHAT("160", "群名片", null, null,null,"share_chat")
;
/**
*
*/
TEXT("10", "文本", "text", "text", "text", "text"),
/**
*
*/
VOICE("20", "语音", null, "voice", null, null),
/**
*
*/
VIDEO("30", "视频", null, null, null, null),
/**
*
*/
NEWS("40", "图文", "feedCard", null, "news", null),
/**
*
*/
TEXT_CARD("50", "文本卡片", null, null, null, null),
/**
*
*/
FILE("60", "文件", null, "file", "file", null),
/**
*
*/
MINI_PROGRAM_NOTICE("70", "小程序通知", null, null, null, null),
/**
* markdown
*/
MARKDOWN("80", "markdown", "markdown", "markdown", "markdown", null),
/**
*
*/
TEMPLATE_CARD("90", "模板卡片", null, null, "template_card", null),
/**
*
*/
IMAGE("100", "图片", null, "image", "image", "image"),
/**
*
*/
LINK("110", "链接消息", "link", "link", null, null),
/**
*
*/
ACTION_CARD("120", "跳转卡片消息", "actionCard", "action_card", null, "interactive"),
/**
* OA
*/
OA("130", "OA消息", null, "oa", null, null),
/**
* (mpNews)
*/
MP_NEWS("140", "图文消息(mpNews)", null, null, null, null),
/**
*
*/
RICH_TEXT("150", "富文本", null, null, null, "post"),
/**
*
*/
SHARE_CHAT("160", "群名片", null, null, null, "share_chat");
private final String code;
private final String description;
@ -101,6 +147,7 @@ public enum SendMessageType {
}
return null;
}
/**
* codeType
*

@ -15,14 +15,21 @@ import lombok.ToString;
public enum ShieldType {
/**
*
*/
NIGHT_NO_SHIELD(10, "夜间不屏蔽"),
/**
* --
*/
NIGHT_SHIELD(20, "夜间屏蔽"),
/**
* (9) --
*/
NIGHT_SHIELD_BUT_NEXT_DAY_SEND(30, "夜间屏蔽(次日早上9点发送)");
private final Integer code;
private final String description;
}

@ -6,6 +6,7 @@ import lombok.ToString;
/**
*
*
* @author 3y
*/
@Getter
@ -13,9 +14,21 @@ import lombok.ToString;
@AllArgsConstructor
public enum SmsStatus {
SEND_SUCCESS(10,"调用渠道接口发送成功"),
RECEIVE_SUCCESS(20,"用户收到短信(收到渠道短信回执,状态成功)"),
/**
*
*/
SEND_SUCCESS(10, "调用渠道接口发送成功"),
/**
* ()
*/
RECEIVE_SUCCESS(20, "用户收到短信(收到渠道短信回执,状态成功)"),
/**
* ()
*/
RECEIVE_FAIL(30, "用户收不到短信(收到渠道短信回执,状态失败)"),
/**
*
*/
SEND_FAIL(40, "调用渠道接口发送失败");
private final Integer code;
@ -24,6 +37,7 @@ public enum SmsStatus {
/**
*
*
* @param code
* @return
*/

@ -7,6 +7,7 @@ import lombok.ToString;
/**
*
*
* @author 3y
*/
@Getter
@ -15,14 +16,21 @@ import lombok.ToString;
public enum SmsSupplier {
TENCENT(10,"腾讯渠道商"),
YUN_PAIN(20,"云片渠道商");
/**
*
*/
TENCENT(10, "腾讯渠道商"),
/**
*
*/
YUN_PAIN(20, "云片渠道商");
private final Integer code;
private final String description;
/**
*
*
* @param code
* @return
*/

@ -6,6 +6,7 @@ import lombok.ToString;
/**
*
*
* @author 3y
*/
@Getter
@ -13,7 +14,13 @@ import lombok.ToString;
@AllArgsConstructor
public enum TemplateType {
/**
* ()
*/
CLOCKING(10, "定时类的模板(后台定时调用)"),
/**
* ()
*/
REALTIME(20, "实时类的模板(接口实时调用)"),
;

@ -27,6 +27,7 @@ public class CronAsyncThreadPoolConfig {
* pending线
* 线线线
* 线Springfalse
*
* @return
*/
public static ExecutorService getConsumePendingThreadPool() {

@ -1,10 +1,5 @@
package com.java3y.austin.cron.dto.getui;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Builder;
@ -14,6 +9,7 @@ import lombok.NoArgsConstructor;
/**
* token
*
* @author 3y
* https://docs.getui.com/getui/server/rest_v2/token/
*/

@ -13,6 +13,7 @@ import org.springframework.stereotype.Service;
/**
*
*
* @author 3y
*/
@Service

@ -19,7 +19,7 @@ import java.util.Arrays;
/**
*
*
* <p>
* example:austin19
*
* @author 3y

@ -7,7 +7,6 @@ import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiGettokenRequest;
import com.dingtalk.api.response.OapiGettokenResponse;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.dto.account.DingDingWorkNoticeAccount;
@ -15,7 +14,6 @@ import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.support.config.SupportThreadPoolConfig;
import com.java3y.austin.support.dao.ChannelAccountDao;
import com.java3y.austin.support.domain.ChannelAccount;
import com.java3y.austin.support.utils.AccountUtils;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

@ -7,7 +7,6 @@ import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.dto.account.GeTuiAccount;
@ -17,7 +16,6 @@ import com.java3y.austin.cron.dto.getui.QueryTokenParamDTO;
import com.java3y.austin.support.config.SupportThreadPoolConfig;
import com.java3y.austin.support.dao.ChannelAccountDao;
import com.java3y.austin.support.domain.ChannelAccount;
import com.java3y.austin.support.utils.AccountUtils;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

@ -11,6 +11,7 @@ import java.util.Map;
/**
* csv
*
* @author 3y
* @date 2022/2/9
*/

@ -13,7 +13,7 @@ import org.springframework.context.annotation.Configuration;
*/
@Slf4j
@Configuration
@ConditionalOnProperty(name = "austin.xxl.job.enabled",havingValue = "true")
@ConditionalOnProperty(name = "austin.xxl.job.enabled", havingValue = "true")
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")

@ -12,6 +12,7 @@ import java.util.Date;
/**
* xxl
*
* @author 3y
*/
@Data

@ -2,6 +2,7 @@ package com.java3y.austin.cron.xxl.enums;
/**
*
*
* @author 3y
*/
public enum ExecutorBlockStrategyEnum {

@ -3,19 +3,50 @@ package com.java3y.austin.cron.xxl.enums;
/**
*
*
* @author 3y
*/
public enum ExecutorRouteStrategyEnum {
/**
* FIRST
*/
FIRST,
/**
* LAST
*/
LAST,
/**
* ROUND
*/
ROUND,
/**
* RANDOM
*/
RANDOM,
/**
* CONSISTENT_HASH
*/
CONSISTENT_HASH,
/**
* LEAST_FREQUENTLY_USED
*/
LEAST_FREQUENTLY_USED,
/**
* LEAST_RECENTLY_USED
*/
LEAST_RECENTLY_USED,
/**
* FAILOVER
*/
FAILOVER,
/**
* BUSYOVER
*/
BUSYOVER,
/**
* SHARDING_BROADCAST
*/
SHARDING_BROADCAST;
ExecutorRouteStrategyEnum() {

@ -2,16 +2,38 @@ package com.java3y.austin.cron.xxl.enums;
/**
* GlueTyp BEAN)
*
* @author 3y
*/
public enum GlueTypeEnum {
/**
* BEAN
*/
BEAN,
/**
* GLUE_GROOVY
*/
GLUE_GROOVY,
/**
* GLUE_SHELL
*/
GLUE_SHELL,
/**
* GLUE_PYTHON
*/
GLUE_PYTHON,
/**
* GLUE_PHP
*/
GLUE_PHP,
/**
* GLUE_NODEJS
*/
GLUE_NODEJS,
/**
* GLUE_POWERSHELL
*/
GLUE_POWERSHELL;
GlueTypeEnum() {

@ -2,6 +2,7 @@ package com.java3y.austin.cron.xxl.enums;
/**
*
*
* @author 3y
*/
public enum MisfireStrategyEnum {

@ -7,6 +7,9 @@ package com.java3y.austin.cron.xxl.enums;
*/
public enum ScheduleTypeEnum {
/**
* NONE
*/
NONE,
/**
* schedule by cron

@ -1,11 +1,12 @@
package com.java3y.austin.cron.xxl.service;
import com.java3y.austin.common.vo.BasicResultVO;
import com.java3y.austin.cron.xxl.entity.XxlJobGroup;
import com.java3y.austin.cron.xxl.entity.XxlJobInfo;
import com.java3y.austin.common.vo.BasicResultVO;
/**
*
*
* @author 3y
*/
public interface CronTaskService {
@ -13,8 +14,8 @@ public interface CronTaskService {
/**
* /
* @param xxlJobInfo
*
* @param xxlJobInfo
* @return Id
*/
BasicResultVO saveCronTask(XxlJobInfo xxlJobInfo);
@ -47,6 +48,7 @@ public interface CronTaskService {
/**
* Id
*
* @param appName
* @param title
* @return BasicResultVO
@ -55,6 +57,7 @@ public interface CronTaskService {
/**
*
*
* @param xxlJobGroup
* @return BasicResultVO
*/

@ -6,12 +6,12 @@ import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.common.enums.RespStatusEnum;
import com.java3y.austin.common.vo.BasicResultVO;
import com.java3y.austin.cron.xxl.constants.XxlJobConstant;
import com.java3y.austin.cron.xxl.entity.XxlJobGroup;
import com.java3y.austin.cron.xxl.entity.XxlJobInfo;
import com.java3y.austin.common.enums.RespStatusEnum;
import com.java3y.austin.cron.xxl.service.CronTaskService;
import com.java3y.austin.common.vo.BasicResultVO;
import com.xxl.job.core.biz.model.ReturnT;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;

@ -2,7 +2,6 @@ package com.java3y.austin.cron.xxl.utils;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.enums.RespStatusEnum;
import com.java3y.austin.common.vo.BasicResultVO;
@ -77,6 +76,7 @@ public class XxlJobUtils {
/**
* jobGroupId
*
* @return
*/
private Integer queryJobGroupId() {

@ -30,7 +30,7 @@ public class AustinHiveBootStrap {
// 2.创建Kafka源表
String kafkaSourceTableCreate = "DROP TABLE IF EXISTS " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SOURCE_TABLE_NAME;
tableEnv.executeSql(kafkaSourceTableCreate);
String kafkaSourceTableDDL = "CREATE TABLE <CATALOG_DEFAULT_DATABASE>.<KAFKA_SOURCE_TABLE_NAME> (\n" +
String kafkaSourceTableDdl = "CREATE TABLE <CATALOG_DEFAULT_DATABASE>.<KAFKA_SOURCE_TABLE_NAME> (\n" +
"`ids` String,\n" +
"`state` String,\n" +
"`businessId` String,\n" +
@ -45,17 +45,17 @@ public class AustinHiveBootStrap {
" 'json.ignore-parse-errors' = 'false',\n" +
" 'scan.topic-partition-discovery.interval'='1s',\n" +
" 'scan.startup.mode' = 'latest-offset')";
kafkaSourceTableDDL = kafkaSourceTableDDL.replace("<CATALOG_DEFAULT_DATABASE>", DataHouseConstant.CATALOG_DEFAULT_DATABASE)
kafkaSourceTableDdl = kafkaSourceTableDdl.replace("<CATALOG_DEFAULT_DATABASE>", DataHouseConstant.CATALOG_DEFAULT_DATABASE)
.replace("<KAFKA_SOURCE_TABLE_NAME>", DataHouseConstant.KAFKA_SOURCE_TABLE_NAME)
.replace("<KAFKA_IP_PORT>", DataHouseConstant.KAFKA_IP_PORT)
.replace("<KAFKA_TOPIC>", DataHouseConstant.KAFKA_TOPIC);
tableEnv.executeSql(kafkaSourceTableDDL);
tableEnv.executeSql(kafkaSourceTableDdl);
// 创建写入hive的表
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SINK_TABLE_NAME);
String kafkaSinkTableDDL = "CREATE TABLE IF NOT EXISTS <CATALOG_DEFAULT_DATABASE>.<KAFKA_SINK_TABLE_NAME> (\n" +
String kafkaSinkTableDdl = "CREATE TABLE IF NOT EXISTS <CATALOG_DEFAULT_DATABASE>.<KAFKA_SINK_TABLE_NAME> (\n" +
"`ids` String,\n" +
"`state` String,\n" +
"`businessId` String,\n" +
@ -68,9 +68,9 @@ public class AustinHiveBootStrap {
" 'sink.buffer-flush.max-rows'='10',\n" +
" 'sink.buffer-flush.interval' = '5s'\n" +
")";
kafkaSinkTableDDL = kafkaSinkTableDDL.replace("<CATALOG_DEFAULT_DATABASE>", DataHouseConstant.CATALOG_DEFAULT_DATABASE)
kafkaSinkTableDdl = kafkaSinkTableDdl.replace("<CATALOG_DEFAULT_DATABASE>", DataHouseConstant.CATALOG_DEFAULT_DATABASE)
.replace("<KAFKA_SINK_TABLE_NAME>", DataHouseConstant.KAFKA_SINK_TABLE_NAME);
tableEnv.executeSql(kafkaSinkTableDDL);
tableEnv.executeSql(kafkaSinkTableDdl);
// 3. 将kafka_source 数据写入到kafka_sink 完成
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

@ -2,8 +2,6 @@ package com.java3y.austin.handler.alipay.impl;
import com.alipay.api.AlipayApiException;
import com.alipay.api.AlipayClient;
import com.alipay.api.AlipayConfig;
import com.alipay.api.DefaultAlipayClient;
import com.alipay.api.domain.AlipayOpenAppMiniTemplatemessageSendModel;
import com.alipay.api.request.AlipayOpenAppMiniTemplatemessageSendRequest;
import com.java3y.austin.common.dto.account.AlipayMiniProgramAccount;
@ -50,11 +48,11 @@ public class AlipayMiniProgramAccountServiceImpl implements AlipayMiniProgramAcc
/**
*
*/
private List<AlipayOpenAppMiniTemplatemessageSendRequest> assembleReq(AlipayMiniProgramParam alipayMiniProgramParam, AlipayMiniProgramAccount alipayMiniProgramAccount){
private List<AlipayOpenAppMiniTemplatemessageSendRequest> assembleReq(AlipayMiniProgramParam alipayMiniProgramParam, AlipayMiniProgramAccount alipayMiniProgramAccount) {
Set<String> receiver = alipayMiniProgramParam.getToUserId();
List<AlipayOpenAppMiniTemplatemessageSendRequest> requestList = new ArrayList<>(receiver.size());
for(String toUserId : receiver){
for (String toUserId : receiver) {
AlipayOpenAppMiniTemplatemessageSendRequest request = new AlipayOpenAppMiniTemplatemessageSendRequest();
AlipayOpenAppMiniTemplatemessageSendModel model = new AlipayOpenAppMiniTemplatemessageSendModel();
model.setToUserId(toUserId);

@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit;
public class HandlerThreadPoolConfig {
private static final String PRE_FIX = "austin.";
/**
* 线
* 线keepAliveTime()

@ -1,6 +1,5 @@
package com.java3y.austin.handler.deduplication;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.DeduplicationType;

@ -3,7 +3,8 @@ package com.java3y.austin.handler.deduplication.limit;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
/**
* @author cao

@ -15,6 +15,7 @@ public interface LimitService {
/**
*
*
* @param service
* @param taskInfo
* @param param

@ -1,7 +1,6 @@
package com.java3y.austin.handler.deduplication.limit;
import cn.hutool.core.collection.CollUtil;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
@ -15,6 +14,7 @@ import java.util.stream.Collectors;
/**
*
*
* @author cao
* @date 2022-04-20 13:41
*/

@ -18,6 +18,7 @@ import java.util.Set;
/**
* rediszset
*
* @author cao
* @date 2022-04-20 11:34
*/

@ -11,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.Set;
/**
* @author 3y

@ -11,6 +11,7 @@ public interface DeduplicationService {
/**
*
*
* @param param
*/
void deduplication(DeduplicationParam param);

@ -2,7 +2,6 @@ package com.java3y.austin.handler.discard;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.TaskInfo;
@ -14,6 +13,7 @@ import org.springframework.stereotype.Service;
/**
*
*
* @author 3y.
*/
@Service
@ -29,6 +29,7 @@ public class DiscardMessageService {
/**
* apollo
*
* @param taskInfo
* @return
*/

@ -1,5 +1,6 @@
package com.java3y.austin.handler.domain.dingding;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -9,7 +10,7 @@ import java.util.List;
/**
*
*
* <p>
* https://open.dingtalk.com/document/group/custom-robot-access
*
* @author 3y
@ -168,7 +169,8 @@ public class DingDingRobotParam {
/**
* actionURL
*/
private String actionURL;
@JSONField(name = "actionURL")
private String actionUrl;
}
}
@ -200,11 +202,13 @@ public class DingDingRobotParam {
/**
* messageURL
*/
private String messageURL;
@JSONField(name = "messageURL")
private String messageUrl;
/**
* picURL
*/
private String picURL;
@JSONField(name = "picURL")
private String picUrl;
}
}
}

@ -10,6 +10,7 @@ import java.util.Set;
/**
* param
*
* @author 3y
* https://docs.getui.com/getui/server/rest_v2/push/
*/

@ -11,6 +11,7 @@ import lombok.NoArgsConstructor;
/**
*
*
* @author 3y
* https://docs.getui.com/getui/server/rest_v2/common_args/?id=doc-title-1
*/

@ -9,6 +9,7 @@ import java.util.List;
/**
*
*
* @author 3y
*/
@NoArgsConstructor

@ -15,7 +15,13 @@ import lombok.ToString;
public enum RateLimitStrategy {
/**
* (QPS
*/
REQUEST_RATE_LIMIT(10, "根据真实请求数限流"),
/**
*
*/
SEND_USER_NUM_RATE_LIMIT(20, "根据发送用户数限流"),
;

@ -3,17 +3,12 @@ package com.java3y.austin.handler.flowcontrol;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit;
import com.java3y.austin.support.service.ConfigService;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
@ -22,6 +17,11 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 3y
* @date 2022/4/18

@ -9,6 +9,7 @@ import java.lang.annotation.*;
*
* Created by TOM
* On 2022/7/21 17:03
*
* @author TOM
*/
@Target({ElementType.TYPE})

@ -10,6 +10,7 @@ import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit;
/**
* Created by TOM
* On 2022/7/21 17:05
*
* @author TOM
*/
@LocalRateLimit(rateLimitStrategy = RateLimitStrategy.REQUEST_RATE_LIMIT)

@ -10,6 +10,7 @@ import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit;
/**
* Created by TOM
* On 2022/7/21 17:14
*
* @author TOM
*/
@LocalRateLimit(rateLimitStrategy = RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT)

@ -6,9 +6,9 @@ import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.flowcontrol.FlowControlFactory;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.support.utils.LogUtils;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.util.Objects;
/**
@ -54,6 +54,7 @@ public abstract class BaseHandler implements Handler {
flowControlFactory.flowControl(taskInfo, flowControlParam);
}
}
@Override
public void doHandler(TaskInfo taskInfo) {
flowControl(taskInfo);
@ -65,8 +66,6 @@ public abstract class BaseHandler implements Handler {
}
/**
* handler
*
@ -76,5 +75,4 @@ public abstract class BaseHandler implements Handler {
public abstract boolean handler(TaskInfo taskInfo);
}

@ -34,7 +34,7 @@ public class AlipayMiniProgramAccountHandler extends BaseHandler implements Hand
AlipayMiniProgramParam miniProgramParam = buildMiniProgramParam(taskInfo);
try {
alipayMiniProgramAccountService.send(miniProgramParam);
}catch (Exception e) {
} catch (Exception e) {
log.error("AlipayMiniProgramAccountHandler#handler fail:{},params:{}",
Throwables.getStackTraceAsString(e), JSON.toJSONString(taskInfo));
return false;
@ -48,7 +48,7 @@ public class AlipayMiniProgramAccountHandler extends BaseHandler implements Hand
* @param taskInfo
* @return AlipayMiniProgramParam
*/
private AlipayMiniProgramParam buildMiniProgramParam(TaskInfo taskInfo){
private AlipayMiniProgramParam buildMiniProgramParam(TaskInfo taskInfo) {
AlipayMiniProgramParam param = AlipayMiniProgramParam.builder()
.toUserId(taskInfo.getReceiver())
.messageTemplateId(taskInfo.getMessageTemplateId())

@ -7,7 +7,6 @@ import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.account.DingDingRobotAccount;
import com.java3y.austin.common.dto.model.DingDingRobotContentModel;
@ -65,7 +64,6 @@ public class DingDingRobotHandler extends BaseHandler implements Handler {
}
private DingDingRobotParam assembleParam(TaskInfo taskInfo) {
// 接收者相关
@ -91,13 +89,13 @@ public class DingDingRobotHandler extends BaseHandler implements Handler {
param.setLink(DingDingRobotParam.LinkVO.builder().title(contentModel.getTitle()).text(contentModel.getContent()).messageUrl(contentModel.getUrl()).picUrl(contentModel.getPicUrl()).build());
}
if (SendMessageType.NEWS.getCode().equals(contentModel.getSendType())) {
List<DingDingRobotParam.FeedCardVO.LinksVO> linksVOS = JSON.parseArray(contentModel.getFeedCards(), DingDingRobotParam.FeedCardVO.LinksVO.class);
DingDingRobotParam.FeedCardVO feedCardVO = DingDingRobotParam.FeedCardVO.builder().links(linksVOS).build();
List<DingDingRobotParam.FeedCardVO.LinksVO> linksVoS = JSON.parseArray(contentModel.getFeedCards(), DingDingRobotParam.FeedCardVO.LinksVO.class);
DingDingRobotParam.FeedCardVO feedCardVO = DingDingRobotParam.FeedCardVO.builder().links(linksVoS).build();
param.setFeedCard(feedCardVO);
}
if (SendMessageType.ACTION_CARD.getCode().equals(contentModel.getSendType())) {
List<DingDingRobotParam.ActionCardVO.BtnsVO> btnsVOS = JSON.parseArray(contentModel.getBtns(), DingDingRobotParam.ActionCardVO.BtnsVO.class);
DingDingRobotParam.ActionCardVO actionCardVO = DingDingRobotParam.ActionCardVO.builder().title(contentModel.getTitle()).text(contentModel.getContent()).btnOrientation(contentModel.getBtnOrientation()).btns(btnsVOS).build();
List<DingDingRobotParam.ActionCardVO.BtnsVO> btnsVoS = JSON.parseArray(contentModel.getBtns(), DingDingRobotParam.ActionCardVO.BtnsVO.class);
DingDingRobotParam.ActionCardVO actionCardVO = DingDingRobotParam.ActionCardVO.builder().title(contentModel.getTitle()).text(contentModel.getContent()).btnOrientation(contentModel.getBtnOrientation()).btns(btnsVoS).build();
param.setActionCard(actionCardVO);
}

@ -7,8 +7,10 @@ import com.alibaba.fastjson.JSON;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request;
import com.dingtalk.api.request.OapiMessageCorpconversationGetsendresultRequest;
import com.dingtalk.api.request.OapiMessageCorpconversationRecallRequest;
import com.dingtalk.api.response.OapiMessageCorpconversationAsyncsendV2Response;
import com.dingtalk.api.response.OapiMessageCorpconversationGetsendresultResponse;
import com.dingtalk.api.response.OapiMessageCorpconversationRecallResponse;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.AustinConstant;
@ -202,14 +204,13 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler {
*/
public void pull(Long accountId) {
try {
// DingDingWorkNoticeAccount account = accountUtils.getAccountById(accountId, DingDingWorkNoticeAccount.class);
// String accessToken = redisTemplate.opsForValue().get(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + accountId);
// DingTalkClient client = new DefaultDingTalkClient(PULL_URL);
// OapiMessageCorpconversationGetsendresultRequest req = new OapiMessageCorpconversationGetsendresultRequest();
// req.setAgentId(Long.valueOf(account.getAgentId()));
// req.setTaskId(456L);
// OapiMessageCorpconversationGetsendresultResponse rsp = client.execute(req, accessToken);
// System.out.println(rsp.getBody());
DingDingWorkNoticeAccount account = accountUtils.getAccountById(accountId.intValue(), DingDingWorkNoticeAccount.class);
String accessToken = redisTemplate.opsForValue().get(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + accountId);
DingTalkClient client = new DefaultDingTalkClient(PULL_URL);
OapiMessageCorpconversationGetsendresultRequest req = new OapiMessageCorpconversationGetsendresultRequest();
req.setAgentId(Long.valueOf(account.getAgentId()));
req.setTaskId(456L);
OapiMessageCorpconversationGetsendresultResponse rsp = client.execute(req, accessToken);
} catch (Exception e) {
log.error("DingDingWorkNoticeHandler#pull fail:{}", Throwables.getStackTraceAsString(e));
}

@ -5,7 +5,6 @@ import cn.hutool.extra.mail.MailAccount;
import cn.hutool.extra.mail.MailUtil;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.model.EmailContentModel;
import com.java3y.austin.common.enums.ChannelType;
@ -74,6 +73,7 @@ public class EmailHandler extends BaseHandler implements Handler {
}
return account;
}
@Override
public void recall(MessageTemplate messageTemplate) {

@ -1,12 +1,10 @@
package com.java3y.austin.handler.handler.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.model.EnterpriseWeChatContentModel;
import com.java3y.austin.common.enums.ChannelType;

@ -76,8 +76,8 @@ public class EnterpriseWeChatRobotHandler extends BaseHandler implements Handler
param.setFile(EnterpriseWeChatRobotParam.FileDTO.builder().mediaId(contentModel.getMediaId()).build());
}
if (SendMessageType.NEWS.getCode().equals(contentModel.getSendType())) {
List<EnterpriseWeChatRobotParam.NewsDTO.ArticlesDTO> articlesDTOS = JSON.parseArray(contentModel.getArticles(), EnterpriseWeChatRobotParam.NewsDTO.ArticlesDTO.class);
param.setNews(EnterpriseWeChatRobotParam.NewsDTO.builder().articles(articlesDTOS).build());
List<EnterpriseWeChatRobotParam.NewsDTO.ArticlesDTO> articlesDtoS = JSON.parseArray(contentModel.getArticles(), EnterpriseWeChatRobotParam.NewsDTO.ArticlesDTO.class);
param.setNews(EnterpriseWeChatRobotParam.NewsDTO.builder().articles(articlesDtoS).build());
}
if (SendMessageType.TEMPLATE_CARD.getCode().equals(contentModel.getSendType())) {
//

@ -1,25 +1,17 @@
package com.java3y.austin.handler.handler.impl;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.account.EnterpriseWeChatRobotAccount;
import com.java3y.austin.common.dto.account.FeiShuRobotAccount;
import com.java3y.austin.common.dto.model.EnterpriseWeChatRobotContentModel;
import com.java3y.austin.common.dto.model.FeiShuRobotContentModel;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.common.enums.SendMessageType;
import com.java3y.austin.handler.domain.feishu.FeiShuRobotParam;
import com.java3y.austin.handler.domain.feishu.FeiShuRobotResult;
import com.java3y.austin.handler.domain.wechat.robot.EnterpriseWeChatRobotParam;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.handler.Handler;
import com.java3y.austin.support.domain.MessageTemplate;
@ -78,9 +70,9 @@ public class FeiShuRobotHandler extends BaseHandler implements Handler {
param.setContent(FeiShuRobotParam.ContentDTO.builder().text(contentModel.getContent()).build());
}
if (SendMessageType.RICH_TEXT.getCode().equals(contentModel.getSendType())) {
List<FeiShuRobotParam.ContentDTO.PostDTO.ZhCnDTO.PostContentDTO> postContentDTOS = JSON.parseArray(contentModel.getPostContent(), FeiShuRobotParam.ContentDTO.PostDTO.ZhCnDTO.PostContentDTO.class);
List<FeiShuRobotParam.ContentDTO.PostDTO.ZhCnDTO.PostContentDTO> postContentDtoS = JSON.parseArray(contentModel.getPostContent(), FeiShuRobotParam.ContentDTO.PostDTO.ZhCnDTO.PostContentDTO.class);
List<List<FeiShuRobotParam.ContentDTO.PostDTO.ZhCnDTO.PostContentDTO>> postContentList = new ArrayList<>();
postContentList.add(postContentDTOS);
postContentList.add(postContentDtoS);
FeiShuRobotParam.ContentDTO.PostDTO postDTO = FeiShuRobotParam.ContentDTO.PostDTO.builder()
.zhCn(FeiShuRobotParam.ContentDTO.PostDTO.ZhCnDTO.builder().title(contentModel.getTitle()).content(postContentList).build())
.build();

@ -12,7 +12,6 @@ import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.account.GeTuiAccount;
import com.java3y.austin.common.dto.model.PushContentModel;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.domain.push.PushParam;
import com.java3y.austin.handler.domain.push.getui.BatchSendPushParam;
import com.java3y.austin.handler.domain.push.getui.SendPushParam;

@ -29,7 +29,7 @@ public class MessageReceipt {
while (true) {
try {
for (ReceiptMessageStater receiptMessageStater : receiptMessageStaterList) {
receiptMessageStater.start();
//receiptMessageStater.start();
}
Thread.sleep(2000);
} catch (Exception e) {

@ -2,24 +2,16 @@ package com.java3y.austin.handler.receiver.kafka;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.LogParam;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.handler.HandlerHolder;
import com.java3y.austin.handler.pending.Task;
import com.java3y.austin.handler.pending.TaskPendingHolder;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.utils.LogUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
@ -40,6 +32,7 @@ import java.util.Optional;
public class Receiver {
@Autowired
private ConsumeService consumeService;
/**
*
*
@ -64,12 +57,13 @@ public class Receiver {
/**
*
*
* @param consumerRecord
*/
@KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}",containerFactory = "filterContainerFactory")
public void recall(ConsumerRecord<?,String> consumerRecord){
@KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}", groupId = "#{'${austin.business.recall.group.name}'}", containerFactory = "filterContainerFactory")
public void recall(ConsumerRecord<?, String> consumerRecord) {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()){
if (kafkaMessage.isPresent()) {
MessageTemplate messageTemplate = JSON.parseObject(kafkaMessage.get(), MessageTemplate.class);
consumeService.consume2recall(messageTemplate);
}

@ -80,6 +80,7 @@ public class ReceiverStart {
/**
* tag
* producer tagheader
*
* @return
*/
@Bean

@ -1,7 +1,6 @@
package com.java3y.austin.handler.receiver.service.impl;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.LogParam;
import com.java3y.austin.common.domain.TaskInfo;

@ -25,6 +25,7 @@ public interface SmsScript {
/**
*
*
* @param scriptName
* @return
*/

@ -66,7 +66,7 @@ public class TencentSmsScript implements SmsScript {
PullSmsSendStatusResponse resp = client.PullSmsSendStatus(req);
return assemblePullSmsRecord(account, resp);
} catch (Exception e) {
// log.error("TencentSmsReceipt#pull fail!{}", Throwables.getStackTraceAsString(e));
log.error("TencentSmsReceipt#pull fail!{}", Throwables.getStackTraceAsString(e));
return null;
}
}

@ -12,6 +12,7 @@ public interface ShieldService {
/**
*
*
* @param taskInfo
*/
void shield(TaskInfo taskInfo);

@ -20,6 +20,7 @@ import java.util.HashSet;
/**
*
*
* @author 3y
*/
@Service

@ -32,6 +32,7 @@ public class GroupIdMappingUtils {
/**
* TaskInfogroupId
*
* @param taskInfo
* @return
*/

@ -33,6 +33,7 @@ public class AfterParamCheckAction implements BusinessProcess<SendTaskModel> {
public static final String EMAIL_REGEX_EXP = "^[A-Za-z0-9-_\\u4e00-\\u9fa5]+@[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$";
public static final HashMap<Integer, String> CHANNEL_REGEX_EXP = new HashMap<>();
static {
CHANNEL_REGEX_EXP.put(IdType.PHONE.getCode(), PHONE_REGEX_EXP);
CHANNEL_REGEX_EXP.put(IdType.EMAIL.getCode(), EMAIL_REGEX_EXP);
@ -55,6 +56,7 @@ public class AfterParamCheckAction implements BusinessProcess<SendTaskModel> {
/**
*
*
*
* @param taskInfo
*/
private void filterIllegalReceiver(List<TaskInfo> taskInfo) {

@ -5,9 +5,7 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.constant.CommonConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.model.ContentModel;

@ -12,7 +12,6 @@ import com.java3y.austin.support.pipeline.ProcessContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@ -6,18 +6,19 @@ import com.java3y.austin.service.api.impl.action.AfterParamCheckAction;
import com.java3y.austin.service.api.impl.action.AssembleAction;
import com.java3y.austin.service.api.impl.action.PreParamCheckAction;
import com.java3y.austin.service.api.impl.action.SendMqAction;
import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessController;
import com.java3y.austin.support.pipeline.ProcessTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* apipipeline
*
* @author 3y
*/
@Configuration
@ -38,6 +39,7 @@ public class PipelineConfig {
* 2.
* 3.
* 4. MQ
*
* @return
*/
@Bean("commonSendTemplate")
@ -52,6 +54,7 @@ public class PipelineConfig {
*
* 1.
* 2.MQ
*
* @return
*/
@Bean("recallMessageTemplate")
@ -64,6 +67,7 @@ public class PipelineConfig {
/**
* pipeline
* BusinessCodeProcessTemplate
*
* @return
*/
@Bean

@ -13,6 +13,7 @@ import org.springframework.stereotype.Service;
/**
*
*
* @author 3y
*/
@Service

@ -1,15 +1,6 @@
package com.java3y.austin.service.api.impl.service;
import com.java3y.austin.common.enums.RespStatusEnum;
import com.java3y.austin.common.vo.BasicResultVO;
import com.java3y.austin.service.api.domain.BatchSendRequest;
import com.java3y.austin.service.api.domain.MessageParam;
import com.java3y.austin.service.api.domain.SendRequest;
import com.java3y.austin.service.api.domain.SendResponse;
import com.java3y.austin.service.api.enums.BusinessCode;
import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessContext;
import com.java3y.austin.support.pipeline.ProcessController;
import com.java3y.austin.support.pipeline.ProcessTemplate;
import org.junit.jupiter.api.Test;
@ -19,14 +10,8 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class SendServiceImplTest {

@ -11,6 +11,7 @@ import java.util.Map;
/**
*
* single
*
* @author 3y
*/
@Data
@ -38,5 +39,5 @@ public class MessageParam {
* @Description:
*
*/
private Map<String,String> extra;
private Map<String, String> extra;
}

@ -9,6 +9,7 @@ import lombok.experimental.Accessors;
/**
* /
*
* @author 3y
*/
@Data
@ -39,5 +40,4 @@ public class SendRequest {
private MessageParam messageParam;
}

@ -7,6 +7,7 @@ import lombok.experimental.Accessors;
/**
*
*
* @author 3y
*/
@Data

@ -13,17 +13,25 @@ import lombok.ToString;
@AllArgsConstructor
public enum BusinessCode {
/** 普通发送流程 */
/**
*
*/
COMMON_SEND("send", "普通发送"),
/** 撤回流程 */
/**
*
*/
RECALL("recall", "撤回消息");
/** code 关联着责任链的模板 */
/**
* code
*/
private final String code;
/** 类型说明 */
/**
*
*/
private final String description;

@ -1,6 +1,5 @@
package com.java3y.austin.service.api.service;
import com.java3y.austin.service.api.domain.BatchSendRequest;
import com.java3y.austin.service.api.domain.SendRequest;
import com.java3y.austin.service.api.domain.SendResponse;

@ -14,6 +14,7 @@ public interface SendService {
/**
*
*
* @param sendRequest
* @return
*/
@ -22,6 +23,7 @@ public interface SendService {
/**
*
*
* @param batchSendRequest
* @return
*/

@ -2,6 +2,7 @@ package com.java3y.austin.stream.constants;
/**
* Flink
*
* @author 3y
*/
public class AustinFlinkConstant {

@ -49,7 +49,7 @@ public class OkHttpConfiguration {
.connectionPool(pool())
.connectTimeout(connectTimeout, TimeUnit.SECONDS)
.readTimeout(readTimeout, TimeUnit.SECONDS)
.writeTimeout(writeTimeout,TimeUnit.SECONDS)
.writeTimeout(writeTimeout, TimeUnit.SECONDS)
.hostnameVerifier((hostname, session) -> true)
.build();
}
@ -61,10 +61,12 @@ public class OkHttpConfiguration {
public void checkClientTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];

@ -10,7 +10,6 @@ import java.util.concurrent.TimeUnit;
/**
* @author 3y
* support 线
*
*/
public class SupportThreadPoolConfig {

@ -10,6 +10,7 @@ import java.util.List;
/**
* Dao
*
* @author 3y
*/
public interface MessageTemplateDao extends JpaRepository<MessageTemplate, Long>, JpaSpecificationExecutor<MessageTemplate> {
@ -17,6 +18,7 @@ public interface MessageTemplateDao extends JpaRepository<MessageTemplate, Long>
/**
* )
*
* @param deleted 0 1
* @param pageable
* @return
@ -26,6 +28,7 @@ public interface MessageTemplateDao extends JpaRepository<MessageTemplate, Long>
/**
*
*
* @param deleted
* @return
*/

@ -27,7 +27,7 @@ import java.io.Serializable;
public class MessageTemplate implements Serializable {
@Id
@GeneratedValue(strategy= GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
@ -145,5 +145,4 @@ public class MessageTemplate implements Serializable {
private Integer updated;
}

@ -22,7 +22,7 @@ import javax.persistence.Id;
public class SmsRecord {
@Id
@GeneratedValue(strategy= GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save