kafka test go!

pipeline not done
pull/2/head
3y 3 years ago
parent 3fa0a63f98
commit 2678a76b91

@ -11,5 +11,11 @@
<artifactId>austin-service-api-impl</artifactId> <artifactId>austin-service-api-impl</artifactId>
<dependencies>
<dependency>
<groupId>com.java3y.austin</groupId>
<artifactId>austin-service-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project> </project>

@ -0,0 +1,10 @@
package com.java3y.austin.pipeline;
/**
*
*
* @author 3y
*/
public interface BusinessProcess {
void process(ProcessContext context);
}

@ -0,0 +1,24 @@
package com.java3y.austin.pipeline;
/**
*
* @author 3y
*/
public class ProcessContext {
/**
* code
*/
private String code;
/**
*
*/
private ProcessModel processModel;
/**
*
*/
private Boolean needBreak = false;
}

@ -0,0 +1,10 @@
package com.java3y.austin.pipeline;
/**
*
* @author 3y
*/
public class ProcessModel {
}

@ -0,0 +1,19 @@
package com.java3y.austin.pipeline;
import java.util.List;
/**
*
* @author 3y
*/
public class ProcessTemplate {
private List<BusinessProcess> processList;
public List<BusinessProcess> getProcessList() {
return processList;
}
public void setProcessList(List<BusinessProcess> processList) {
this.processList = processList;
}
}

@ -0,0 +1,20 @@
package com.java3y.austin.service;
import com.java3y.austin.domain.SendRequest;
import com.java3y.austin.domain.SendResponse;
/**
*
* @author 3y
*/
public class SendServiceImpl implements SendService {
@Override
public SendResponse send(SendRequest sendRequest) {
return null;
}
@Override
public SendResponse batchSend(SendRequest sendRequest) {
return null;
}
}

@ -0,0 +1,23 @@
package com.java3y.austin.domain;
/**
*
*
*/
public class SendRequest {
/**
* Id
*/
private Long messageTemplateId;
/**
*
*
*/
private String receiver;
}

@ -0,0 +1,4 @@
package com.java3y.austin.domain;
public class SendResponse {
}

@ -0,0 +1,18 @@
package com.java3y.austin.service;
import com.java3y.austin.domain.SendRequest;
import com.java3y.austin.domain.SendResponse;
/**
*
*
* @author 3y
*/
public interface SendService {
SendResponse send(SendRequest sendRequest);
SendResponse batchSend(SendRequest sendRequest);
}

@ -21,6 +21,10 @@
<artifactId>austin-handler</artifactId> <artifactId>austin-handler</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

@ -0,0 +1,24 @@
package com.java3y.austin.controller;
import com.java3y.austin.kafkatest.UserLogProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaTestController {
@Autowired
private UserLogProducer userLogProducer;
/**
* test insert
*/
@GetMapping("/kafka/insert")
public String insert(String userId) {
userLogProducer.sendLog(userId);
return null;
}
}

@ -0,0 +1,18 @@
package com.java3y.austin.kafkatest;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* @Author 18011618
* @Description
* @Date 14:42 2018/7/20
* @Modify By
*/
@Data
@Accessors(chain = true)
public class UserLog {
private String username;
private String userid;
private String state;
}

@ -0,0 +1,30 @@
package com.java3y.austin.kafkatest;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Author 18011618
* @Description
* @Date 14:50 2018/7/20
* @Modify By
*/
@Component
@Slf4j
public class UserLogConsumer {
@KafkaListener(topics = {"austin"},groupId = "austinGroup1")
public void consumer(ConsumerRecord<?,?> consumerRecord){
//判断是否为null
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
log.info(">>>>>>>>>> record =" + kafkaMessage);
if(kafkaMessage.isPresent()){
//得到Optional实例中的值
Object message = kafkaMessage.get();
System.err.println("消费消息:"+message);
}
}
}

@ -0,0 +1,29 @@
package com.java3y.austin.kafkatest;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* @Author 18011618
* @Description
* @Date 14:43 2018/7/20
* @Modify By
*/
@Component
public class UserLogProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
*
* @param userid
*/
public void sendLog(String userid){
UserLog userLog = new UserLog();
userLog.setUsername("jhp").setUserid(userid).setState("0");
System.err.println("发送用户日志数据:"+userLog);
kafkaTemplate.send("austin", JSON.toJSONString(userLog));
}
}

@ -25,5 +25,17 @@ spring:
username: username:
password: password:
driver-class-name: driver-class-name:
# kafka相关的信息配置 TODO
kafka:
bootstrap-servers:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto:
offset:
reset: earliest
# tomcat / HikariPool(数据库连接池 配置) TODO # tomcat / HikariPool(数据库连接池 配置) TODO
Loading…
Cancel
Save