|
|
|
@ -1,4 +1,4 @@
|
|
|
|
|
package logdistributor
|
|
|
|
|
package logs
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
rspb "k8s.io/helm/pkg/proto/hapi/release"
|
|
|
|
@ -7,13 +7,13 @@ import (
|
|
|
|
|
type Logsub struct {
|
|
|
|
|
C chan *rspb.Log
|
|
|
|
|
release string
|
|
|
|
|
sources []rspb.LogSource
|
|
|
|
|
level rspb.LogLevel
|
|
|
|
|
sources []rspb.Log_Source
|
|
|
|
|
level rspb.Log_Level
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type release struct {
|
|
|
|
|
name string
|
|
|
|
|
sourceMappings map[rspb.LogSource]map[*Logsub]bool
|
|
|
|
|
sourceMappings map[rspb.Log_Source]map[*Logsub]bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Pubsub struct {
|
|
|
|
@ -27,18 +27,18 @@ func New() *Pubsub {
|
|
|
|
|
|
|
|
|
|
func newRelease(name string) *release {
|
|
|
|
|
rs := &release{name: name}
|
|
|
|
|
rs.sourceMappings = make(map[rspb.LogSource]map[*Logsub]bool, len(rspb.LogSource_name))
|
|
|
|
|
rs.sourceMappings = make(map[rspb.Log_Source]map[*Logsub]bool, len(rspb.Log_Source_name))
|
|
|
|
|
return rs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rs *release) subscribe(sub *Logsub) {
|
|
|
|
|
for _, source := range sub.sources {
|
|
|
|
|
logSource := rspb.LogSource(source)
|
|
|
|
|
if _, ok := rs.sourceMappings[logSource]; !ok {
|
|
|
|
|
Log_Source := rspb.Log_Source(source)
|
|
|
|
|
if _, ok := rs.sourceMappings[Log_Source]; !ok {
|
|
|
|
|
subs := make(map[*Logsub]bool, 1)
|
|
|
|
|
rs.sourceMappings[logSource] = subs
|
|
|
|
|
rs.sourceMappings[Log_Source] = subs
|
|
|
|
|
}
|
|
|
|
|
rs.sourceMappings[logSource][sub] = true
|
|
|
|
|
rs.sourceMappings[Log_Source][sub] = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -51,7 +51,7 @@ func (ps *Pubsub) subscribe(sub *Logsub) {
|
|
|
|
|
ps.releases[sub.release].subscribe(sub)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ps *Pubsub) Subscribe(release string, level rspb.LogLevel, sources ...rspb.LogSource) *Logsub {
|
|
|
|
|
func (ps *Pubsub) Subscribe(release string, level rspb.Log_Level, sources ...rspb.Log_Source) *Logsub {
|
|
|
|
|
ch := make(chan *rspb.Log)
|
|
|
|
|
ls := &Logsub{C: ch, release: release, level: level, sources: sources}
|
|
|
|
|
ps.subscribe(ls)
|
|
|
|
@ -72,7 +72,7 @@ func (ps *Pubsub) Unsubscribe(sub *Logsub) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ps *Pubsub) PubLog(rls string, source rspb.LogSource, level rspb.LogLevel, message string) {
|
|
|
|
|
func (ps *Pubsub) PubLog(rls string, source rspb.Log_Source, level rspb.Log_Level, message string) {
|
|
|
|
|
log := &rspb.Log{Release: rls, Source: source, Level: level, Log: message}
|
|
|
|
|
if rls, ok := ps.releases[log.Release]; ok {
|
|
|
|
|
if subs, ok := rls.sourceMappings[log.Source]; ok {
|