Traefik Architecture Diagram

Traefik 是一个被广泛采用的开源 HTTP 反向代理和负载均衡器,它简化了现代 Web 应用程序的路由和请求负载均衡。它拥有动态配置功能,并支持众多提供商 (providers),定位为编排复杂部署场景的多功能解决方案。在这篇博文中,我们将深入探讨 Traefik 的架构,并剖析其源代码的关键组件,以便更细致地了解其运行机制。

Traefik 架构:高层概述

在其核心,Traefik 的架构由几个主要组件组成,这些组件协同工作以促进动态路由和负载均衡:

  • 静态配置 (Static Configuration): 这些是 Traefik 的基础设置,包括入口点 (entry points)、提供商 (providers) 和 API 访问配置。它们可以通过文件、命令行参数或环境变量来指定。

  • 动态配置 (Dynamic Configuration): 这涉及可根据基础设施状态进行调整的路由规则、服务和中间件。Traefik 与多种提供商(如 Docker、Kubernetes、Consul Catalog 等)的兼容性突显了其动态特性。

  • 提供商 (Providers): 作为 Traefik 与服务发现机制之间的桥梁,提供商的任务是获取并将动态配置传递给 Traefik。每个提供商都是为集成不同技术(如 Docker、Kubernetes 和 Consul)而定制的。

  • 入口点 (Entry Points): 这些指定了 Traefik 监听入站流量的网络接口(端口和协议)。

  • 路由器 (Routers): 路由器建立通过各种参数(如主机名、路径和头部)匹配传入请求的标准。它们负责将请求引导至适当的服务。

  • 服务 (Services): 服务定义了 Traefik 与处理请求的后端系统进行通信的机制。可以通过不同的负载均衡策略和健康检查对服务进行微调。

  • 中间件 (Middlewares): 作为模块化实体,中间件可以附加到路由器或服务上,启用一系列功能,包括身份验证、速率限制和请求修改。

  • 插件 (Plugins): Traefik 可以通过插件进行扩展,从而增强其功能。插件系统允许开发人员贡献自定义插件。

这些组件的编排如下:

  1. Traefik 摄取静态配置并开始监控指定的入口点。
  2. 提供商持续扫描基础设施以查找配置更改,并通过动态配置通道将其转发给 Traefik。
  3. Traefik 处理动态配置,更新其内部结构——路由器、服务和中间件——以反映这些更改。
  4. 当请求进入时,Traefik 根据路由器规则对其进行匹配,并将其引导至指定的服务。
  5. 服务应用其负载均衡逻辑将请求分发到后端服务器。
  6. 中间件可以在请求/响应生命周期的各个阶段被调用,以执行其各自的职责。

这种动态架构使 Traefik 能够敏捷地适应基础设施内的变化,自动调整路由配置以与已部署的服务保持一致。

源码分析:关键组件

现在,让我们检查 Traefik 源代码中的一些关键组件:

1. Watcher 和 Listener:

Watcher 和 Listener 协作以检测和执行动态配置更改。Watcher 负责不断调查配置源并将更新推送到 Listener。随后,Listener 验证这些更新并将它们与 Traefik 的内部状态集成。

Watcher:
pkg/server/configurationwatcher.go

// NewConfigurationWatcher creates a new ConfigurationWatcher.  
func NewConfigurationWatcher(  
   routinesPool *safe.Pool,  
   pvd provider.Provider,  
   defaultEntryPoints []string,  
   requiredProvider string,  
) *ConfigurationWatcher {  
   return &ConfigurationWatcher{  
      providerAggregator:  pvd,  
      allProvidersConfigs: make(chan dynamic.Message, 100),  
      newConfigs:          make(chan dynamic.Configurations),  
      routinesPool:        routinesPool,  
      defaultEntryPoints:  defaultEntryPoints,  
      requiredProvider:    requiredProvider,  
   }  
}  
  
// Start the configuration watcher.
func (c *ConfigurationWatcher) Start() {  
   c.routinesPool.GoCtx(c.receiveConfigurations)  
   c.routinesPool.GoCtx(c.applyConfigurations)  
   c.startProviderAggregator()  
}  
  
// Stop the configuration watcher.
func (c *ConfigurationWatcher) Stop() {  
   close(c.allProvidersConfigs)  
   close(c.newConfigs)  
}  
  
// AddListener adds a new listener function used when new configuration is provided.
func (c *ConfigurationWatcher) AddListener(listener func(dynamic.Configuration)) {  
   if c.configurationListeners == nil {  
      c.configurationListeners = make([]func(dynamic.Configuration), 0)  
   }  
   c.configurationListeners = append(c.configurationListeners, listener)  
}

func (c *ConfigurationWatcher) startProviderAggregator() {  
   log.Info().Msgf("Starting provider aggregator %T", c.providerAggregator)  
  
   safe.Go(func() {  
      err := c.providerAggregator.Provide(c.allProvidersConfigs, c.routinesPool)  
      if err != nil {  
         log.Error().Err(err).Msgf("Error starting provider aggregator %T", c.providerAggregator)  
      }  
   })  
}

Listener:
cmd/traefik/traefik.go

// Server Transports  
watcher.AddListener(func(conf dynamic.Configuration) {  
   roundTripperManager.Update(conf.HTTP.ServersTransports)  
   dialerManager.Update(conf.TCP.ServersTransports)  
})

pkg/server/service/roundtripper.go

// RoundTripperManager handles roundtripper for the reverse proxy.
type RoundTripperManager struct {  
   rtLock        sync.RWMutex  
   roundTrippers map[string]http.RoundTripper  
   configs       map[string]*dynamic.ServersTransport  
  
   spiffeX509Source SpiffeX509Source  
}  
  
// Update updates the roundtrippers configurations.
func (r *RoundTripperManager) Update(newConfigs map[string]*dynamic.ServersTransport) {  
   r.rtLock.Lock()  
   defer r.rtLock.Unlock()  
  
   for configName, config := range r.configs {  
      newConfig, ok := newConfigs[configName]  
      if !ok {  
         delete(r.configs, configName)  
         delete(r.roundTrippers, configName)  
         continue  
      }  
  
      if reflect.DeepEqual(newConfig, config) {  
         continue  
      }  
  
      var err error  
      r.roundTrippers[configName], err = r.createRoundTripper(newConfig)  
      if err != nil {  
         log.Error().Err(err).Msgf("Could not configure HTTP Transport %s, fallback on default transport", configName)  
         r.roundTrippers[configName] = http.DefaultTransport  
      }  
   }  
  
   for newConfigName, newConfig := range newConfigs {  
      if _, ok := r.configs[newConfigName]; ok {  
         continue  
      }  
  
      var err error  
      r.roundTrippers[newConfigName], err = r.createRoundTripper(newConfig)  
      if err != nil {  
         log.Error().Err(err).Msgf("Could not configure HTTP Transport %s, fallback on default transport", newConfigName)  
         r.roundTrippers[newConfigName] = http.DefaultTransport  
      }  
   }  
  
   r.configs = newConfigs  
}

2. Provider:

Provider 在 Traefik 的架构中至关重要,它与特定技术接口以发掘并传递配置更新。Provide 函数位于 Provider 角色的核心,管理连接、发现和更新过程。

Kubernetes Provider: 配置发现

Traefik 中的 Kubernetes 提供商从 Kubernetes API 服务器中提取配置详细信息,监控 Kubernetes 资源(如 Ingress、IngressRoute、Service 和 Middleware 对象)的变化。

2.1. Kubernetes 客户端初始化:

提供商初始化一个客户端以连接到 Kubernetes API,利用此客户端与 API 交互并观察资源变化。

pkg/provider/kubernetes/crd/kubernetes.go

func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) {  
   _, err := labels.Parse(p.LabelSelector)  
   if err != nil {  
      return nil, fmt.Errorf("invalid label selector: %q", p.LabelSelector)  
   }  
   log.Ctx(ctx).Info().Msgf("label selector is: %q", p.LabelSelector)  
  
   withEndpoint := ""  
   if p.Endpoint != "" {  
      withEndpoint = fmt.Sprintf(" with endpoint %s", p.Endpoint)  
   }  
  
   var client *clientWrapper  
   switch {  
   case os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "":  
      log.Ctx(ctx).Info().Msgf("Creating in-cluster Provider client%s", withEndpoint)  
      client, err = newInClusterClient(p.Endpoint)  
   case os.Getenv("KUBECONFIG") != "":  
      log.Ctx(ctx).Info().Msgf("Creating cluster-external Provider client from KUBECONFIG %s", os.Getenv("KUBECONFIG"))  
      client, err = newExternalClusterClientFromFile(os.Getenv("KUBECONFIG"))  
   default:  
      log.Ctx(ctx).Info().Msgf("Creating cluster-external Provider client%s", withEndpoint)  
      client, err = newExternalClusterClient(p.Endpoint, p.CertAuthFilePath, p.Token)  
   }  
  
   if err != nil {  
      return nil, err  
   }  
  
   client.labelSelector = p.LabelSelector  
   return client, nil  
}

2.2. 资源监控 (Resource Watching):

提供商使用 Kubernetes 客户端来观察相关资源的变化。通常使用 Informers 来有效地跟踪这些资源,并在创建、更新或删除时接收通知。

pkg/provider/kubernetes/crd/kubernetes.go

// Provide allows the k8s provider to provide configurations to traefik// using the given configuration channel.  
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {  
   logger := log.With().Str(logs.ProviderName, providerName).Logger()  
   ctxLog := logger.WithContext(context.Background())  
  
   k8sClient, err := p.newK8sClient(ctxLog)  
   if err != nil {  
      return err  
   }  
  
   if p.AllowCrossNamespace {  
      logger.Warn().Msg("Cross-namespace reference between IngressRoutes and resources is enabled, please ensure that this is expected (see AllowCrossNamespace option)")  
   }  
  
   if p.AllowExternalNameServices {  
      logger.Warn().Msg("ExternalName service loading is enabled, please ensure that this is expected (see AllowExternalNameServices option)")  
   }  
  
   pool.GoCtx(func(ctxPool context.Context) {  
      operation := func() error {  
         eventsChan, err := k8sClient.WatchAll(p.Namespaces, ctxPool.Done())  
         if err != nil {  
            logger.Error().Err(err).Msg("Error watching kubernetes events")  
            timer := time.NewTimer(1 * time.Second)  
            select {  
            case <-timer.C:  
               return err  
            case <-ctxPool.Done():  
               return nil  
            }  
         }  
  
         throttleDuration := time.Duration(p.ThrottleDuration)  
         throttledChan := throttleEvents(ctxLog, throttleDuration, pool, eventsChan)  
         if throttledChan != nil {  
            eventsChan = throttledChan  
         }  
  
         for {  
            select {  
            case <-ctxPool.Done():  
               return nil  
            case event := <-eventsChan:  
               // Note that event is the *first* event that came in during this throttling interval -- if we're hitting our throttle, we may have dropped events.  
               // This is fine, because we don't treat different event types differently.               
               // But if we do in the future, we'll need to track more information about the dropped events.               
               conf := p.loadConfigurationFromCRD(ctxLog, k8sClient)  
  
               confHash, err := hashstructure.Hash(conf, nil)  
               switch {  
               case err != nil:  
                  logger.Error().Err(err).Msg("Unable to hash the configuration")  
               case p.lastConfiguration.Get() == confHash:  
                  logger.Debug().Msgf("Skipping Kubernetes event kind %T", event)  
               default:  
                  p.lastConfiguration.Set(confHash)  
                  configurationChan <- dynamic.Message{  
                     ProviderName:  providerName,  
                     Configuration: conf,  
                  }  
               }  
  
               // If we're throttling,  
               // we sleep here for the throttle duration to enforce that we don't refresh faster than our throttle.               
               // time.Sleep returns immediately if p.ThrottleDuration is 0 (no throttle).               
               time.Sleep(throttleDuration)  
            }  
         }  
      }  
  
      notify := func(err error, time time.Duration) {  
         logger.Error().Err(err).Msgf("Provider error, retrying in %s", time)  
      }  
      err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxPool), notify)  
      if err != nil {  
         logger.Error().Err(err).Msg("Cannot retrieve data")  
      }  
   })  
  
   return nil  
}

2.3. 配置解析 (Configuration Parsing):

接收到资源更新后,提供商会解析数据以提取配置详细信息。然后,这些配置被转换为 Traefik 特定的格式,用于路由器、服务和中间件。

pkg/provider/kubernetes/crd/kubernetes.go

func (p *Provider) loadConfigurationFromCRD(ctx context.Context, client Client) *dynamic.Configuration {
    // ...
    conf := &dynamic.Configuration{  
      // TODO: choose between mutating and returning tlsConfigs  
      HTTP: p.loadIngressRouteConfiguration(ctx, client, tlsConfigs),  
      TCP:  p.loadIngressRouteTCPConfiguration(ctx, client, tlsConfigs),  
      UDP:  p.loadIngressRouteUDPConfiguration(ctx, client),  
      TLS: &dynamic.TLSConfiguration{  
         Options: buildTLSOptions(ctx, client),  
         Stores:  stores,  
      },  
   }  
   //...
   conf.HTTP.Middlewares[id] = &dynamic.Middleware{  
         AddPrefix:         middleware.Spec.AddPrefix,  
         StripPrefix:       middleware.Spec.StripPrefix,  
         StripPrefixRegex:  middleware.Spec.StripPrefixRegex,  
         ReplacePath:       middleware.Spec.ReplacePath,  
         ReplacePathRegex:  middleware.Spec.ReplacePathRegex,  
         Chain:             createChainMiddleware(ctxMid, middleware.Namespace, middleware.Spec.Chain),  
         IPWhiteList:       middleware.Spec.IPWhiteList,  
         IPAllowList:       middleware.Spec.IPAllowList,  
         Headers:           middleware.Spec.Headers,  
         Errors:            errorPage,  
         RateLimit:         rateLimit,  
         RedirectRegex:     middleware.Spec.RedirectRegex,  
         RedirectScheme:    middleware.Spec.RedirectScheme,  
         BasicAuth:         basicAuth,  
         DigestAuth:        digestAuth,  
         ForwardAuth:       forwardAuth,  
         InFlightReq:       middleware.Spec.InFlightReq,  
         Buffering:         middleware.Spec.Buffering,  
         CircuitBreaker:    circuitBreaker,  
         Compress:          middleware.Spec.Compress,  
         PassTLSClientCert: middleware.Spec.PassTLSClientCert,  
         Retry:             retry,  
         ContentType:       middleware.Spec.ContentType,  
         GrpcWeb:           middleware.Spec.GrpcWeb,  
         Plugin:            plugin,  
      }  
   }  
   //...
      cb := configBuilder{  
      client:                    client,  
      allowCrossNamespace:       p.AllowCrossNamespace,  
      allowExternalNameServices: p.AllowExternalNameServices,  
      allowEmptyServices:        p.AllowEmptyServices,  
   }  
  
   for _, service := range client.GetTraefikServices() {  
      err := cb.buildTraefikService(ctx, service, conf.HTTP.Services)  
      if err != nil {  
         log.Ctx(ctx).Error().Str(logs.ServiceName, service.Name).Err(err).  
            Msg("Error while building TraefikService")  
         continue  
      }  
   }  
   //...
    id := provider.Normalize(makeID(serversTransport.Namespace, serversTransport.Name))  
      conf.HTTP.ServersTransports[id] = &dynamic.ServersTransport{  
         ServerName:          serversTransport.Spec.ServerName,  
         InsecureSkipVerify:  serversTransport.Spec.InsecureSkipVerify,  
         RootCAs:             rootCAs,  
         Certificates:        certs,  
         DisableHTTP2:        serversTransport.Spec.DisableHTTP2,  
         MaxIdleConnsPerHost: serversTransport.Spec.MaxIdleConnsPerHost,  
         ForwardingTimeouts:  forwardingTimeout,  
         PeerCertURI:         serversTransport.Spec.PeerCertURI,  
         Spiffe:              serversTransport.Spec.Spiffe,  
      }  
   }  
   // ...
    for _, serversTransportTCP := range client.GetServersTransportTCPs() {  
  
      var tcpServerTransport dynamic.TCPServersTransport  
      tcpServerTransport.SetDefaults()  
  
      if serversTransportTCP.Spec.DialTimeout != nil {  
       // ...
      }  
  
      if serversTransportTCP.Spec.DialKeepAlive != nil {  
       // ...
      }  
  
      if serversTransportTCP.Spec.TerminationDelay != nil {  
       // ...
      }  
  
      if serversTransportTCP.Spec.TLS != nil {  
         var rootCAs []types.FileOrContent  
         for _, secret := range serversTransportTCP.Spec.TLS.RootCAsSecrets {  
            caSecret, err := loadCASecret(serversTransportTCP.Namespace, secret, client)  
            if err != nil {  
               logger.Error().  
                  Err(err).  
                  Str("rootCAs", secret).  
                  Msg("Error while loading rootCAs")  
               continue  
            }  
  
            rootCAs = append(rootCAs, types.FileOrContent(caSecret))  
         }  
  
         var certs tls.Certificates  
         for _, secret := range serversTransportTCP.Spec.TLS.CertificatesSecrets {  
            tlsCert, tlsKey, err := loadAuthTLSSecret(serversTransportTCP.Namespace, secret, client)  
            if err != nil {  
               logger.Error().  
                  Err(err).  
                  Str("certificates", secret).  
                  Msg("Error while loading certificates")  
               continue  
            }  
  
            certs = append(certs, tls.Certificate{  
               CertFile: types.FileOrContent(tlsCert),  
               KeyFile:  types.FileOrContent(tlsKey),  
            })  
         }  
  
         tcpServerTransport.TLS = &dynamic.TLSClientConfig{  
            ServerName:         serversTransportTCP.Spec.TLS.ServerName,  
            InsecureSkipVerify: serversTransportTCP.Spec.TLS.InsecureSkipVerify,  
            RootCAs:            rootCAs,  
            Certificates:       certs,  
            PeerCertURI:        serversTransportTCP.Spec.TLS.PeerCertURI,  
         }  
  
         tcpServerTransport.TLS.Spiffe = serversTransportTCP.Spec.TLS.Spiffe  
      }  
  
      id := provider.Normalize(makeID(serversTransportTCP.Namespace, serversTransportTCP.Name))  
      conf.TCP.ServersTransports[id] = &tcpServerTransport  
   }  
}

2.4. 配置更新:

提供商通过 configurationChan 将 Traefik 就绪的配置传输给 Watcher。Watcher 随后整合这些更新并将它们转发给 Listener 以进行应用。

此过程确保 Traefik 动态更新其路由规则和配置,以与 Kubernetes 集群的演变状态保持一致。

3. Switcher:

Switcher 的本质是 sync.Map + interface{} 的组合,其目的是以线程安全的方式获取并动态更新 handler。

pkg/safe/safe.go

// Safe encapsulates a thread-safe value.
type Safe struct {  
   value interface{}  
   lock  sync.RWMutex  
}

pkg/server/server_entrypoint_tcp.go

func createHTTPServer(ctx context.Context, ln net.Listener, configuration *static.EntryPoint, withH2c bool, reqDecorator *requestdecorator.RequestDecorator) (*httpServer, error) {  
   if configuration.HTTP2.MaxConcurrentStreams < 0 {  
      return nil, errors.New("max concurrent streams value must be greater than or equal to zero")  
   }  
  
   httpSwitcher := middlewares.NewHandlerSwitcher(router.BuildDefaultHTTPRouter())  
  
   next, err := alice.New(requestdecorator.WrapHandler(reqDecorator)).Then(httpSwitcher)  
   if err != nil {  
      return nil, err  
   }

pkg/middlewares/handler_switcher.go

// NewHandlerSwitcher builds a new instance of HTTPHandlerSwitcher.  
func NewHandlerSwitcher(newHandler http.Handler) (hs *HTTPHandlerSwitcher) {  
   return &HTTPHandlerSwitcher{  
      handler: safe.New(newHandler),  
   }  
}  
  
func (h *HTTPHandlerSwitcher) ServeHTTP(rw http.ResponseWriter, req *http.Request) {  
   handlerBackup := h.handler.Get().(http.Handler)  
   handlerBackup.ServeHTTP(rw, req)  
}  
  
// GetHandler returns the current http.ServeMux.  
func (h *HTTPHandlerSwitcher) GetHandler() (newHandler http.Handler) {  
   handler := h.handler.Get().(http.Handler)  
   return handler  
}  
  
// UpdateHandler safely updates the current http.ServeMux with a new one.func (h *HTTPHandlerSwitcher) UpdateHandler(newHandler http.Handler) {  
   h.handler.Set(newHandler)  
}

健康检查:确保可用性

Traefik 对自身和后端服务进行健康检查,以确保流量仅路由到运行中的实例。

自我健康检查

Traefik 通过 /ping 端点实现自我健康检查。外部系统可以调用此端点以确定 Traefik 的健康状态。

// Do try to do a healthcheck.func Do(staticConfiguration static.Configuration) (*http.Response, error) {  
   if staticConfiguration.Ping == nil {  
      return nil, errors.New("please enable `ping` to use health check")  
   }  
  
   ep := staticConfiguration.Ping.EntryPoint  
   if ep == "" {  
      ep = "traefik"  
   }  
  
   pingEntryPoint, ok := staticConfiguration.EntryPoints[ep]  
   if !ok {  
      return nil, fmt.Errorf("ping: missing %s entry point", ep)  
   }  
  
   client := &http.Client{Timeout: 5 * time.Second}  
   protocol := "http"  
  
   path := "/"  
  
   return client.Head(protocol + "://" + pingEntryPoint.GetAddress() + path + "ping")  
}

当 Traefik 运行时,此端点返回 200 OK 状态。失败的健康检查会导致非 200 状态代码,表明存在问题。

后端服务健康检查

Traefik 支持通过 HTTP、HTTPS 和 gRPC 协议对后端服务进行健康检查。可配置的健康检查使得能够评估后端服务器的可用性。

以下是 Traefik 如何进行后端服务健康检查的简要说明:

  1. 配置: 在服务配置中设置健康检查细节,例如健康检查路径、间隔和超时等参数。
  2. 健康检查器 (Health Checker): 为每个启用了健康检查的服务实例化一个 ServiceHealthChecker。
  3. 定期检查: 健康检查器根据间隔设置定期向后端服务器发送请求。
  4. 状态更新: 健康检查器根据健康检查响应更新每个服务器的状态。
  5. 负载均衡器更新: 负载均衡器会收到任何状态更改的通知,并在分发请求时予以考虑。

这是 ServiceHealthChecker 的 Go 代码片段:
pkg/healthcheck/healthcheck.go

func NewServiceHealthChecker(ctx context.Context, metrics metricsHealthCheck, config *dynamic.ServerHealthCheck, service StatusSetter, info *runtime.ServiceInfo, transport http.RoundTripper, targets map[string]*url.URL) *ServiceHealthChecker {  
   logger := log.Ctx(ctx)  
  
   interval := time.Duration(config.Interval)  
   if interval <= 0 {  
      logger.Error().Msg("Health check interval smaller than zero")  
      interval = time.Duration(dynamic.DefaultHealthCheckInterval)  
   }  
  
   timeout := time.Duration(config.Timeout)  
   if timeout <= 0 {  
      logger.Error().Msg("Health check timeout smaller than zero")  
      timeout = time.Duration(dynamic.DefaultHealthCheckTimeout)  
   }  
  
   client := &http.Client{  
      Transport: transport,  
   }  
  
   if config.FollowRedirects != nil && !*config.FollowRedirects {  
      client.CheckRedirect = func(req *http.Request, via []*http.Request) error {  
         return http.ErrUseLastResponse  
      }  
   }  
  
   return &ServiceHealthChecker{  
      balancer: service,  
      info:     info,  
      config:   config,  
      interval: interval,  
      timeout:  timeout,  
      targets:  targets,  
      client:   client,  
      metrics:  metrics,  
   }  
}  
  
func (shc *ServiceHealthChecker) Launch(ctx context.Context) {  
   ticker := time.NewTicker(shc.interval)  
   defer ticker.Stop()  
  
   for {  
      select {  
      case <-ctx.Done():  
         return  
  
      case <-ticker.C:  
         for proxyName, target := range shc.targets {  
            select {  
            case <-ctx.Done():  
               return  
            default:  
            }  
  
            up := true  
            serverUpMetricValue := float64(1)  
  
            if err := shc.executeHealthCheck(ctx, shc.config, target); err != nil {  
               // The context is canceled when the dynamic configuration is refreshed.  
               // 当动态配置刷新时,context 被取消。
               if errors.Is(err, context.Canceled) {  
                  return  
               }  
  
               log.Ctx(ctx).Warn().  
                  Str("targetURL", target.String()).  
                  Err(err).  
                  Msg("Health check failed.")  
  
               up = false  
               serverUpMetricValue = float64(0)  
            }  
  
            shc.balancer.SetStatus(ctx, proxyName, up)  
  
            statusStr := runtime.StatusDown  
            if up {  
               statusStr = runtime.StatusUp  
            }  
  
            shc.info.UpdateServerStatus(target.String(), statusStr)  
  
            shc.metrics.ServiceServerUpGauge().  
               With("service", proxyName, "url", target.String()).  
               Set(serverUpMetricValue)  
         }  
      }  
   }  
}  
  
func (shc *ServiceHealthChecker) executeHealthCheck(ctx context.Context, config *dynamic.ServerHealthCheck, target *url.URL) error {  
   ctx, cancel := context.WithDeadline(ctx, time.Now().Add(shc.timeout))  
   defer cancel()  
  
   if config.Mode == modeGRPC {  
      return shc.checkHealthGRPC(ctx, target)  
   }  
   return shc.checkHealthHTTP(ctx, target)  
}  
  
// checkHealthHTTP returns an error with a meaningful description if the health check failed.
// Dedicated to HTTP servers.  
func (shc *ServiceHealthChecker) checkHealthHTTP(ctx context.Context, target *url.URL) error {  
   req, err := shc.newRequest(ctx, target)  
   if err != nil {  
      return fmt.Errorf("create HTTP request: %w", err)  
   }  
  
   resp, err := shc.client.Do(req)  
   if err != nil {  
      return fmt.Errorf("HTTP request failed: %w", err)  
   }  
  
   defer resp.Body.Close()  
  
   if shc.config.Status == 0 && (resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest) {  
      return fmt.Errorf("received error status code: %v", resp.StatusCode)  
   }  
  
   if shc.config.Status != 0 && shc.config.Status != resp.StatusCode {  
      return fmt.Errorf("received error status code: %v expected status code: %v", resp.StatusCode, shc.config.Status)  
   }  
  
   return nil  
}

这段代码展示了健康检查器发送健康检查请求并根据响应更新服务器状态。负载均衡器随后使用此信息将流量仅引导至响应正常的服务器。

中间件:增强路由过程

Traefik 的中间件系统允许将附加功能注入请求/响应生命周期。中间件可以链接到路由器或服务,以执行身份验证、速率限制和请求转换等任务。

Traefik 提供了广泛的内置中间件,并且可以通过插件集成自定义中间件。

中间件处理管道

以下概述了 Traefik 如何处理中间件:

  1. 配置: 中间件在动态配置中定义,并链接到路由器或服务。
  2. 中间件链: 根据配置构建中间件链,顺序决定了它们的执行顺序。
  3. 请求处理: 对进入的请求,在到达后端服务之前会遍历中间件链,允许每个中间件更改请求或生成响应。
  4. 响应处理: 后端服务响应后,响应会以相反的顺序通过中间件链回传,允许在到达客户端之前进行修改。

源码分析:中间件构建器 (Middleware Builder)

middleware.Builder 负责从配置构建中间件链。它提供了一个 BuildChain 函数,该函数接受上下文和中间件名称列表,返回一个 alice.Chain 对象。

这是 BuildChain 函数的简化版本:

pkg/server/middleware/middlewares.go

func (b *Builder) BuildChain(ctx context.Context, middlewares []string) *alice.Chain {  
   chain := alice.New()  
   for _, name := range middlewares {  
      middlewareName := provider.GetQualifiedName(ctx, name)  
  
      chain = chain.Append(func(next http.Handler) (http.Handler, error) {  
         constructorContext := provider.AddInContext(ctx, middlewareName)  
         if midInf, ok := b.configs[middlewareName]; !ok || midInf.Middleware == nil {  
            return nil, fmt.Errorf("middleware %q does not exist", middlewareName)  
         }  
  
         var err error  
         if constructorContext, err = checkRecursion(constructorContext, middlewareName); err != nil {  
            b.configs[middlewareName].AddError(err, true)  
            return nil, err  
         }  
  
         constructor, err := b.buildConstructor(constructorContext, middlewareName)  
         if err != nil {  
            b.configs[middlewareName].AddError(err, true)  
            return nil, err  
         }  
  
         handler, err := constructor(next)  
         if err != nil {  
            b.configs[middlewareName].AddError(err, true)  
            return nil, err  
         }  
  
         return handler, nil  
      })  
   }  
   return &chain  
}  
  
func checkRecursion(ctx context.Context, middlewareName string) (context.Context, error) {  
   currentStack, ok := ctx.Value(middlewareStackKey).([]string)  
   if !ok {  
      currentStack = []string{}  
   }  
   if inSlice(middlewareName, currentStack) {  
      return ctx, fmt.Errorf("could not instantiate middleware %s: recursion detected in %s", middlewareName, strings.Join(append(currentStack, middlewareName), "->"))  
   }  
   return context.WithValue(ctx, middlewareStackKey, append(currentStack, middlewareName)), nil  
}  
  
// it is the responsibility of the caller to make sure that b.configs[middlewareName].Middleware exists.  
func (b *Builder) buildConstructor(ctx context.Context, middlewareName string) (alice.Constructor, error) {  
   config := b.configs[middlewareName]  
   if config == nil || config.Middleware == nil {  
      return nil, fmt.Errorf("invalid middleware %q configuration", middlewareName)  
   }  
  
   var middleware alice.Constructor  
   badConf := errors.New("cannot create middleware: multi-types middleware not supported, consider declaring two different pieces of middleware instead")  
   // ...
   // Chain  
   if config.Chain != nil {  
      if middleware != nil {  
         return nil, badConf  
      }  
  
      var qualifiedNames []string  
      for _, name := range config.Chain.Middlewares {  
         qualifiedNames = append(qualifiedNames, provider.GetQualifiedName(ctx, name))  
      }  
      config.Chain.Middlewares = qualifiedNames  
      middleware = func(next http.Handler) (http.Handler, error) {  
         return chain.New(ctx, next, *config.Chain, b, middlewareName)  
      }  
   }  
   // ...
   if middleware == nil {  
      return nil, fmt.Errorf("invalid middleware %q configuration: invalid middleware type or middleware does not exist", middlewareName)  
   }  
  
   // The tracing middleware is a NOOP if tracing is not setup on the middleware chain.  
   // Hence, regarding internal resources' observability deactivation,   // this would not enable tracing.   return tracing.WrapMiddleware(ctx, middleware), nil  
}

此函数遍历中间件名称以构建构造函数链。每个构造函数负责创建一个中间件实例,并将其与链中的下一个处理程序集成。

buildConstructor 函数根据其配置为特定中间件创建构造函数,应用反射来识别中间件类型并实例化相关函数。

这种方法使 Traefik 能够支持具有不同配置的各种中间件,同时促进结构良好且模块化的代码库。

结论