You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

148 lines
4.1 KiB

7 months ago
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package frontend
import (
"context"
"errors"
"math"
"sync"
"time"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/jaegertracing/jaeger/examples/hotrod/pkg/log"
"github.com/jaegertracing/jaeger/examples/hotrod/pkg/pool"
"github.com/jaegertracing/jaeger/examples/hotrod/services/config"
"github.com/jaegertracing/jaeger/examples/hotrod/services/customer"
"github.com/jaegertracing/jaeger/examples/hotrod/services/driver"
"github.com/jaegertracing/jaeger/examples/hotrod/services/route"
)
type bestETA struct {
customer customer.Interface
driver driver.Interface
route route.Interface
pool *pool.Pool
logger log.Factory
}
// Response contains ETA for a trip.
type Response struct {
Driver string
ETA time.Duration
}
func newBestETA(tracer trace.TracerProvider, logger log.Factory, options ConfigOptions) *bestETA {
return &bestETA{
customer: customer.NewClient(
tracer,
logger.With(zap.String("component", "customer_client")),
options.CustomerHostPort,
),
driver: driver.NewClient(
tracer,
logger.With(zap.String("component", "driver_client")),
options.DriverHostPort,
),
route: route.NewClient(
tracer,
logger.With(zap.String("component", "route_client")),
options.RouteHostPort,
),
pool: pool.New(config.RouteWorkerPoolSize),
logger: logger,
}
}
func (eta *bestETA) Get(ctx context.Context, customerID int) (*Response, error) {
customer, err := eta.customer.Get(ctx, customerID)
if err != nil {
return nil, err
}
eta.logger.For(ctx).Info("Found customer", zap.Any("customer", customer))
m, err := baggage.NewMember("customer", customer.Name)
if err != nil {
eta.logger.For(ctx).Error("cannot create baggage member", zap.Error(err))
}
bag := baggage.FromContext(ctx)
bag, err = bag.SetMember(m)
if err != nil {
eta.logger.For(ctx).Error("cannot set baggage member", zap.Error(err))
}
ctx = baggage.ContextWithBaggage(ctx, bag)
drivers, err := eta.driver.FindNearest(ctx, customer.Location)
if err != nil {
return nil, err
}
eta.logger.For(ctx).Info("Found drivers", zap.Any("drivers", drivers))
results := eta.getRoutes(ctx, customer, drivers)
eta.logger.For(ctx).Info("Found routes", zap.Any("routes", results))
resp := &Response{ETA: math.MaxInt64}
for _, result := range results {
if result.err != nil {
return nil, err
}
if result.route.ETA < resp.ETA {
resp.ETA = result.route.ETA
resp.Driver = result.driver
}
}
if resp.Driver == "" {
return nil, errors.New("no routes found")
}
eta.logger.For(ctx).Info("Dispatch successful", zap.String("driver", resp.Driver), zap.String("eta", resp.ETA.String()))
return resp, nil
}
type routeResult struct {
driver string
route *route.Route
err error
}
// getRoutes calls Route service for each (customer, driver) pair
func (eta *bestETA) getRoutes(ctx context.Context, customer *customer.Customer, drivers []driver.Driver) []routeResult {
results := make([]routeResult, 0, len(drivers))
wg := sync.WaitGroup{}
routesLock := sync.Mutex{}
for _, dd := range drivers {
wg.Add(1)
driver := dd // capture loop var
// Use worker pool to (potentially) execute requests in parallel
eta.pool.Execute(func() {
route, err := eta.route.FindRoute(ctx, driver.Location, customer.Location)
routesLock.Lock()
results = append(results, routeResult{
driver: driver.DriverID,
route: route,
err: err,
})
routesLock.Unlock()
wg.Done()
})
}
wg.Wait()
return results
}