fix: fix user module format code

pull/2100/head
Xinwei Xiong (cubxxw) 2 years ago
parent 43c96f30a8
commit 1c29734a76

@ -337,7 +337,19 @@ More details")
// Suppose an error occurs here
err, _ := someFunc()
if err != nil {
return errs.Wrap(err, "doSomething failed")
return errs.WrapMsg(err, "doSomething failed")
}
}
```
It just works if the package is wrong:
```go
func doSomething() error {
// Suppose an error occurs here
err, _ := someFunc()
if err != nil {
return errs.Wrap(err)
}
}
```

@ -65,11 +65,11 @@ func Start(config *config.GlobalConfig, port int, proPort int) error {
// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(config)
if err != nil {
return errs.Wrap(err, "register discovery err")
return errs.WrapMsg(err, "register discovery err")
}
if err = client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
return errs.Wrap(err, "create rpc root nodes error")
return errs.WrapMsg(err, "create rpc root nodes error")
}
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.EncodeConfig()); err != nil {
@ -120,7 +120,7 @@ func Start(config *config.GlobalConfig, port int, proPort int) error {
util.SIGTERMExit()
err := server.Shutdown(ctx)
if err != nil {
return errs.Wrap(err, "shutdown err")
return errs.WrapMsg(err, "shutdown err")
}
case <-netDone:
close(netDone)

@ -47,11 +47,11 @@ func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
gz := gzip.NewWriter(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, errs.Wrap(err, "GzipCompressor.Compress: writing to gzip writer failed")
return nil, errs.WrapMsg(err, "GzipCompressor.Compress: writing to gzip writer failed")
}
if err := gz.Close(); err != nil {
return nil, errs.Wrap(err, "GzipCompressor.Compress: closing gzip writer failed")
return nil, errs.WrapMsg(err, "GzipCompressor.Compress: closing gzip writer failed")
}
return gzipBuffer.Bytes(), nil
@ -65,10 +65,10 @@ func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
gz.Reset(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, errs.Wrap(err, "GzipCompressor.CompressWithPool: error writing data")
return nil, errs.WrapMsg(err, "GzipCompressor.CompressWithPool: error writing data")
}
if err := gz.Close(); err != nil {
return nil, errs.Wrap(err, "GzipCompressor.CompressWithPool: error closing gzip writer")
return nil, errs.WrapMsg(err, "GzipCompressor.CompressWithPool: error closing gzip writer")
}
return gzipBuffer.Bytes(), nil
}
@ -77,16 +77,16 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) {
buff := bytes.NewBuffer(compressedData)
reader, err := gzip.NewReader(buff)
if err != nil {
return nil, errs.Wrap(err, "GzipCompressor.DeCompress: NewReader creation failed")
return nil, errs.WrapMsg(err, "GzipCompressor.DeCompress: NewReader creation failed")
}
decompressedData, err := io.ReadAll(reader)
if err != nil {
return nil, errs.Wrap(err, "GzipCompressor.DeCompress: reading from gzip reader failed")
return nil, errs.WrapMsg(err, "GzipCompressor.DeCompress: reading from gzip reader failed")
}
if err = reader.Close(); err != nil {
// Even if closing the reader fails, we've successfully read the data,
// so we return the decompressed data and an error indicating the close failure.
return decompressedData, errs.Wrap(err, "GzipCompressor.DeCompress: closing gzip reader failed")
return decompressedData, errs.WrapMsg(err, "GzipCompressor.DeCompress: closing gzip reader failed")
}
return decompressedData, nil
}
@ -97,16 +97,16 @@ func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, erro
err := reader.Reset(bytes.NewReader(compressedData))
if err != nil {
return nil, errs.Wrap(err, "GzipCompressor.DecompressWithPool: resetting gzip reader failed")
return nil, errs.WrapMsg(err, "GzipCompressor.DecompressWithPool: resetting gzip reader failed")
}
decompressedData, err := io.ReadAll(reader)
if err != nil {
return nil, errs.Wrap(err, "GzipCompressor.DecompressWithPool: reading from pooled gzip reader failed")
return nil, errs.WrapMsg(err, "GzipCompressor.DecompressWithPool: reading from pooled gzip reader failed")
}
if err = reader.Close(); err != nil {
// Similar to DeCompress, return the data and error for close failure.
return decompressedData, errs.Wrap(err, "GzipCompressor.DecompressWithPool: closing pooled gzip reader failed")
return decompressedData, errs.WrapMsg(err, "GzipCompressor.DecompressWithPool: closing pooled gzip reader failed")
}
return decompressedData, nil
}

@ -37,7 +37,7 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) {
enc := gob.NewEncoder(&buff)
err := enc.Encode(data)
if err != nil {
return nil, errs.Wrap(err, "GobEncoder.Encode failed")
return nil, errs.WrapMsg(err, "GobEncoder.Encode failed")
}
return buff.Bytes(), nil
}
@ -47,7 +47,7 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
dec := gob.NewDecoder(buff)
err := dec.Decode(decodeData)
if err != nil {
return errs.Wrap(err, "GobEncoder.Decode failed")
return errs.WrapMsg(err, "GobEncoder.Decode failed")
}
return nil
}

@ -75,7 +75,7 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
// The upgrader.Upgrade method usually returns enough error messages to diagnose problems that may occur during the upgrade
return errs.Wrap(err, "GenerateLongConn: WebSocket upgrade failed")
return errs.WrapMsg(err, "GenerateLongConn: WebSocket upgrade failed")
}
d.conn = conn
return nil
@ -106,7 +106,7 @@ func (d *GWebSocket) SetWriteDeadline(timeout time.Duration) error {
// TODO SetWriteDeadline Future add error handling
if err := d.conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
return errs.Wrap(err, "GWebSocket.SetWriteDeadline failed")
return errs.WrapMsg(err, "GWebSocket.SetWriteDeadline failed")
}
return nil
}

@ -119,10 +119,10 @@ func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDi
func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error) {
req := sdkws.GetMaxSeqReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.Wrap(err, "GetSeq: error unmarshaling request")
return nil, errs.WrapMsg(err, "GetSeq: error unmarshaling request")
}
if err := g.validate.Struct(&req); err != nil {
return nil, errs.Wrap(err, "GetSeq: validation failed")
return nil, errs.WrapMsg(err, "GetSeq: validation failed")
}
resp, err := g.msgRpcClient.GetMaxSeq(context, &req)
if err != nil {
@ -130,7 +130,7 @@ func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error)
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.Wrap(err, "GetSeq: error marshaling response")
return nil, errs.WrapMsg(err, "GetSeq: error marshaling response")
}
return c, nil
}
@ -141,12 +141,12 @@ func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error)
// Unmarshal the message data from the request.
var msgData sdkws.MsgData
if err := proto.Unmarshal(data.Data, &msgData); err != nil {
return nil, errs.Wrap(err, "error unmarshalling message data")
return nil, errs.WrapMsg(err, "error unmarshalling message data")
}
// Validate the message data structure.
if err := g.validate.Struct(&msgData); err != nil {
return nil, errs.Wrap(err, "message data validation failed")
return nil, errs.WrapMsg(err, "message data validation failed")
}
req := msg.SendMsgReq{MsgData: &msgData}
@ -158,7 +158,7 @@ func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error)
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.Wrap(err, "error marshaling response")
return nil, errs.WrapMsg(err, "error marshaling response")
}
return c, nil
@ -171,7 +171,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.Wrap(err, "error marshaling response")
return nil, errs.WrapMsg(err, "error marshaling response")
}
return c, nil
}
@ -179,10 +179,10 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) {
req := sdkws.PullMessageBySeqsReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.Wrap(err, "error unmarshaling request")
return nil, errs.WrapMsg(err, "error unmarshaling request")
}
if err := g.validate.Struct(data); err != nil {
return nil, errs.Wrap(err, "validation failed")
return nil, errs.WrapMsg(err, "validation failed")
}
resp, err := g.msgRpcClient.PullMessageBySeqList(context, &req)
if err != nil {
@ -190,7 +190,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.Wrap(err, "error marshaling response")
return nil, errs.WrapMsg(err, "error marshaling response")
}
return c, nil
}
@ -198,7 +198,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) {
req := push.DelUserPushTokenReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.Wrap(err, "error unmarshaling request")
return nil, errs.WrapMsg(err, "error unmarshaling request")
}
resp, err := g.pushClient.DelUserPushToken(context, &req)
if err != nil {
@ -206,7 +206,7 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.Wrap(err, "error marshaling response")
return nil, errs.WrapMsg(err, "error marshaling response")
}
return c, nil
}
@ -214,10 +214,10 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err
func (g GrpcHandler) SetUserDeviceBackground(_ context.Context, data *Req) ([]byte, bool, error) {
req := sdkws.SetAppBackgroundStatusReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, false, errs.Wrap(err, "error unmarshaling request")
return nil, false, errs.WrapMsg(err, "error unmarshaling request")
}
if err := g.validate.Struct(data); err != nil {
return nil, false, errs.Wrap(err, "validation failed")
return nil, false, errs.WrapMsg(err, "validation failed")
}
return nil, req.IsBackground, nil
}

@ -198,7 +198,7 @@ func (ws *WsServer) Run(done chan error) error {
http.HandleFunc("/", ws.wsHandler)
err := server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, "ws start err", server.Addr)
netErr = errs.WrapMsg(err, "ws start err", server.Addr)
close(netDone)
}
}()

@ -457,7 +457,7 @@ func (s *friendServer) UpdateFriends(
err = s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, errs.Wrap(err, "FriendsInfoUpdateNotification Error")
return nil, errs.WrapMsg(err, "FriendsInfoUpdateNotification Error")
}
return resp, nil
}

@ -56,7 +56,7 @@ func StartTask(config *config.GlobalConfig) error {
fmt.Printf("Start msgDestruct cron task, cron config: %s\n", config.MsgDestructTime)
_, err = crontab.AddFunc(config.MsgDestructTime, cronWrapFunc(config, rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
if err != nil {
return errs.Wrap(err, "cron_conversations_destruct_msgs")
return errs.WrapMsg(err, "cron_conversations_destruct_msgs")
}
// start crontab

@ -112,13 +112,13 @@ func initCfg() (*config.GlobalConfig, error) {
cfgPath := flag.String("c", defaultCfgPath, "Path to the configuration file")
data, err := os.ReadFile(*cfgPath)
if err != nil {
return nil, errs.Wrap(err, "ReadFile unmarshal failed")
return nil, errs.WrapMsg(err, "ReadFile unmarshal failed")
}
conf := config.NewGlobalConfig()
err = yaml.Unmarshal(data, &conf)
if err != nil {
return nil, errs.Wrap(err, "InitConfig unmarshal failed")
return nil, errs.WrapMsg(err, "InitConfig unmarshal failed")
}
return conf, nil
}

@ -82,7 +82,7 @@ func (rc *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts))
cmdOpts := rc.applyOptions(opts...)
if err := rc.initializeLogger(cmdOpts); err != nil {
return errs.Wrap(err, "failed to initialize logger")
return errs.WrapMsg(err, "failed to initialize logger")
}
return nil

@ -150,7 +150,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
}
bs, err := json.Marshal(t)
if err != nil {
return "", errs.Wrap(err, "marshal failed")
return "", errs.WrapMsg(err, "marshal failed")
}
write = true

@ -335,7 +335,7 @@ func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID stri
_, err = pipe.Exec(ctx)
if err != nil && err != redis.Nil {
return seqMsgs, failedSeqs, errs.Wrap(err, "pipe.get")
return seqMsgs, failedSeqs, errs.WrapMsg(err, "pipe.get")
}
for idx, res := range results {
@ -473,7 +473,7 @@ func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID
err := wg.Wait()
if err != nil {
return 0, errs.Wrap(err, "wg.Wait failed")
return 0, errs.WrapMsg(err, "wg.Wait failed")
}
return len(msgs), nil
@ -640,7 +640,7 @@ func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string
results, err := pipe.Exec(ctx)
if err != nil {
return errs.Wrap(err, "pipe.del")
return errs.WrapMsg(err, "pipe.del")
}
for _, res := range results {

@ -295,7 +295,7 @@ func (u *UserCacheRedis) refreshStatusOnline(ctx context.Context, userID string,
onlineStatus.UserID = userID
newjsonData, err := json.Marshal(&onlineStatus)
if err != nil {
return errs.Wrap(err, "json.Marshal failed")
return errs.WrapMsg(err, "json.Marshal failed")
}
_, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result()
if err != nil {

@ -72,7 +72,7 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
tokenString, err := token.SignedString([]byte(a.accessSecret))
if err != nil {
return "", errs.Wrap(err, "token.SignedString")
return "", errs.WrapMsg(err, "token.SignedString")
}
return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken)
}

@ -72,7 +72,7 @@ func NewOSS(conf Config) (s3.Interface, error) {
}
bucket, err := client.Bucket(conf.Bucket)
if err != nil {
return nil, errs.Wrap(err, "ali-oss bucket error")
return nil, errs.WrapMsg(err, "ali-oss bucket error")
}
if conf.BucketURL[len(conf.BucketURL)-1] != '/' {
conf.BucketURL += "/"
@ -209,7 +209,7 @@ func (o *OSS) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, erro
} else {
res.Size, err = strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
return nil, errs.Wrap(err, "StatObject content-length parse error")
return nil, errs.WrapMsg(err, "StatObject content-length parse error")
}
if res.Size < 0 {
return nil, errs.Wrap(errors.New("StatObject content-length must be greater than 0"))
@ -220,7 +220,7 @@ func (o *OSS) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, erro
} else {
res.LastModified, err = time.Parse(http.TimeFormat, lastModified)
if err != nil {
return nil, errs.Wrap(err, "StatObject last-modified parse error")
return nil, errs.WrapMsg(err, "StatObject last-modified parse error")
}
}
return res, nil
@ -233,7 +233,7 @@ func (o *OSS) DeleteObject(ctx context.Context, name string) error {
func (o *OSS) CopyObject(ctx context.Context, src string, dst string) (*s3.CopyObjectInfo, error) {
result, err := o.bucket.CopyObject(src, dst)
if err != nil {
return nil, errs.Wrap(err, "CopyObject error")
return nil, errs.WrapMsg(err, "CopyObject error")
}
return &s3.CopyObjectInfo{
Key: dst,
@ -267,7 +267,7 @@ func (o *OSS) ListUploadedParts(ctx context.Context, uploadID string, name strin
Bucket: o.bucket.BucketName,
}, oss.MaxUploads(100), oss.MaxParts(maxParts), oss.PartNumberMarker(partNumberMarker))
if err != nil {
return nil, errs.Wrap(err, "ListUploadedParts error")
return nil, errs.WrapMsg(err, "ListUploadedParts error")
}
res := &s3.ListUploadedPartsResult{
Key: result.Key,
@ -334,7 +334,7 @@ func (o *OSS) AccessURL(ctx context.Context, name string, expire time.Duration,
}
rawParams, err := oss.GetRawParams(opts)
if err != nil {
return "", errs.Wrap(err, "AccessURL error")
return "", errs.WrapMsg(err, "AccessURL error")
}
params := getURLParams(*o.bucket.Client.Conn, rawParams)
return getURL(o.um, o.bucket.BucketName, name, params).String(), nil
@ -356,12 +356,12 @@ func (o *OSS) FormData(ctx context.Context, name string, size int64, contentType
}
policyJson, err := json.Marshal(policy)
if err != nil {
return nil, errs.Wrap(err, "Marshal json error")
return nil, errs.WrapMsg(err, "Marshal json error")
}
policyStr := base64.StdEncoding.EncodeToString(policyJson)
h := hmac.New(sha1.New, []byte(o.credentials.GetAccessKeySecret()))
if _, err := io.WriteString(h, policyStr); err != nil {
return nil, errs.Wrap(err, "WriteString error")
return nil, errs.WrapMsg(err, "WriteString error")
}
fd := &s3.FormData{
URL: o.bucketURL,

@ -149,7 +149,7 @@ func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...strin
_, err := indexView.CreateOne(context.Background(), index, opts)
if err != nil {
return errs.Wrap(err, "CreateIndex")
return errs.WrapMsg(err, "CreateIndex")
}
return nil
}

@ -1110,7 +1110,7 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa
var msgsDocs []docModel
err = cursor.All(ctx, &msgsDocs)
if err != nil {
return 0, nil, errs.Wrap(err, "cursor.All msgsDocs")
return 0, nil, errs.WrapMsg(err, "cursor.All msgsDocs")
}
log.ZDebug(ctx, "query mongoDB", "result", msgsDocs)
msgs := make([]*table.MsgInfoModel, 0)
@ -1135,12 +1135,12 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa
}
data, err := json.Marshal(&revokeContent)
if err != nil {
return 0, nil, errs.Wrap(err, "json.Marshal revokeContent")
return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
}
elem := sdkws.NotificationElem{Detail: string(data)}
content, err := json.Marshal(&elem)
if err != nil {
return 0, nil, errs.Wrap(err, "json.Marshal elem")
return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
}
msgInfo.Msg.ContentType = constant.MsgRevokeNotification
msgInfo.Msg.Content = string(content)

@ -118,7 +118,7 @@ func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string
opts,
)
if err != nil {
return errs.Wrap(err, "transaction failed")
return errs.WrapMsg(err, "transaction failed")
}
}
return nil

@ -133,7 +133,7 @@ func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...g
// Try to dial a new connection
conn, err := cd.dialService(ctx, result, append(cd.additionalOpts, opts...)...)
if err != nil {
return nil, errs.Wrap(err, "address", result)
return nil, errs.WrapMsg(err, "address", result)
}
// Store the new connection
@ -160,7 +160,7 @@ func (cd *ConnDirect) dialService(ctx context.Context, address string, opts ...g
conn, err := grpc.DialContext(ctx, cd.resolverDirect.Scheme()+":///"+address, options...)
if err != nil {
return nil, errs.Wrap(err, "address", address)
return nil, errs.WrapMsg(err, "address", address)
}
return conn, nil
}

@ -66,12 +66,12 @@ func Post(ctx context.Context, url string, header map[string]string, data any, t
jsonStr, err := json.Marshal(data)
if err != nil {
return nil, errs.Wrap(err, "Post: JSON marshal failed")
return nil, errs.WrapMsg(err, "Post: JSON marshal failed")
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(jsonStr))
if err != nil {
return nil, errs.Wrap(err, "Post: NewRequestWithContext failed")
return nil, errs.WrapMsg(err, "Post: NewRequestWithContext failed")
}
if operationID, _ := ctx.Value(constant.OperationID).(string); operationID != "" {
@ -84,13 +84,13 @@ func Post(ctx context.Context, url string, header map[string]string, data any, t
resp, err := client.Do(req)
if err != nil {
return nil, errs.Wrap(err, "Post: client.Do failed")
return nil, errs.WrapMsg(err, "Post: client.Do failed")
}
defer resp.Body.Close()
result, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errs.Wrap(err, "Post: ReadAll failed")
return nil, errs.WrapMsg(err, "Post: ReadAll failed")
}
return result, nil
@ -103,7 +103,7 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input
}
err = json.Unmarshal(b, output)
if err != nil {
return errs.Wrap(err, "PostReturn: JSON unmarshal failed")
return errs.WrapMsg(err, "PostReturn: JSON unmarshal failed")
}
return nil
}

@ -57,13 +57,13 @@ func NewKafkaConsumer(addr []string, topic string, config *config.GlobalConfig)
}
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil {
return nil, errs.Wrap(err, "NewKafkaConsumer: creating consumer failed")
return nil, errs.WrapMsg(err, "NewKafkaConsumer: creating consumer failed")
}
p.Consumer = consumer
partitionList, err := consumer.Partitions(p.Topic)
if err != nil {
return nil, errs.Wrap(err, "NewKafkaConsumer: getting partitions failed")
return nil, errs.WrapMsg(err, "NewKafkaConsumer: getting partitions failed")
}
p.PartitionList = partitionList

@ -160,10 +160,10 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
// Marshal the protobuf message
bMsg, err := proto.Marshal(msg)
if err != nil {
return 0, 0, errs.Wrap(err, "kafka proto Marshal err")
return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err")
}
if len(bMsg) == 0 {
return 0, 0, errs.Wrap(errEmptyMsg, "")
return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err")
}
// Prepare Kafka message

@ -61,7 +61,7 @@ func Start(
rpcTcpAddr,
)
if err != nil {
return errs.Wrap(err, "listen err", rpcTcpAddr)
return errs.WrapMsg(err, "listen err", rpcTcpAddr)
}
defer listener.Close()
@ -119,7 +119,7 @@ func Start(
// Create a HTTP server for prometheus.
httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, "prometheus start err", httpServer.Addr)
netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr)
netDone <- struct{}{}
}
}
@ -128,7 +128,7 @@ func Start(
go func() {
err := srv.Serve(listener)
if err != nil {
netErr = errs.Wrap(err, "rpc start err: ", rpcTcpAddr)
netErr = errs.WrapMsg(err, "rpc start err: ", rpcTcpAddr)
netDone <- struct{}{}
}
}()
@ -147,7 +147,7 @@ func Start(
defer cancel()
err := httpServer.Shutdown(ctx)
if err != nil {
return errs.Wrap(err, "shutdown err")
return errs.WrapMsg(err, "shutdown err")
}
return nil
case <-netDone:

@ -55,7 +55,7 @@ func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byt
if clientCertFile != "" && clientKeyFile != "" {
certPEMBlock, err := os.ReadFile(clientCertFile)
if err != nil {
return nil, errs.Wrap(err, "NewTLSConfig: failed to read client cert file")
return nil, errs.WrapMsg(err, "NewTLSConfig: failed to read client cert file")
}
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
if err != nil {
@ -64,7 +64,7 @@ func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byt
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
return nil, errs.Wrap(err, "NewTLSConfig: failed to create X509 key pair")
return nil, errs.WrapMsg(err, "NewTLSConfig: failed to create X509 key pair")
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
@ -72,7 +72,7 @@ func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byt
if caCertFile != "" {
caCert, err := os.ReadFile(caCertFile)
if err != nil {
return nil, errs.Wrap(err, "NewTLSConfig: failed to read CA cert file")
return nil, errs.WrapMsg(err, "NewTLSConfig: failed to read CA cert file")
}
caCertPool := x509.NewCertPool()

@ -48,13 +48,13 @@ var (
func initCfg() (*config.GlobalConfig, error) {
data, err := os.ReadFile(*cfgPath)
if err != nil {
return nil, errs.Wrap(err, "ReadFile unmarshal failed")
return nil, errs.WrapMsg(err, "ReadFile unmarshal failed")
}
conf := config.NewGlobalConfig()
err = yaml.Unmarshal(data, &conf)
if err != nil {
return nil, errs.Wrap(err, "InitConfig unmarshal failed")
return nil, errs.WrapMsg(err, "InitConfig unmarshal failed")
}
return conf, nil
}
@ -309,7 +309,7 @@ func configGetEnv(config *config.GlobalConfig) error {
config.Mongo.Database = getEnv("MONGO_DATABASE", config.Mongo.Database)
maxPoolSize, err := getEnvInt("MONGO_MAX_POOL_SIZE", config.Mongo.MaxPoolSize)
if err != nil {
return errs.Wrap(err, "MONGO_MAX_POOL_SIZE")
return errs.WrapMsg(err, "MONGO_MAX_POOL_SIZE")
}
config.Mongo.MaxPoolSize = maxPoolSize
@ -360,7 +360,7 @@ func getEnvInt(key string, fallback int) (int, error) {
if value, exists := os.LookupEnv(key); exists {
val, err := strconv.Atoi(value)
if err != nil {
return 0, errs.Wrap(err, "string to int failed")
return 0, errs.WrapMsg(err, "string to int failed")
}
return val, nil
}

Loading…
Cancel
Save