文档章节

Kubernetes Nginx Ingress Controller源码分析

WaltonWang
 WaltonWang
发布于 2017/08/07 23:41
字数 4798
阅读 135
收藏 1
点赞 0
评论 0

main

controllers/nginx/pkg/cmd/controller/main.go:29

func main() {
	// start a new nginx controller
	ngx := newNGINXController()
	// create a custom Ingress controller using NGINX as backend
	ic := controller.NewIngressController(ngx)
	go handleSigterm(ic)
	// start the controller
	ic.Start()
	// wait
	glog.Infof("shutting down Ingress controller...")
	for {
		glog.Infof("Handled quit, awaiting pod deletion")
		time.Sleep(30 * time.Second)
	}
}
  • start a new nginx controller.
  • create a custom Ingress controller using NGINX as backend.
  • start the Ingress controller.

newNGINXController

controllers/nginx/pkg/cmd/controller/nginx.go:68

func newNGINXController() ingress.Controller {

    // 从环境变量“NGINX_BINARY”中获取nginx二进制文件的路径,如果没有该环境变量,则使用预设的默认值“/usr/sbin/nginx”
	ngx := os.Getenv("NGINX_BINARY")
	if ngx == "" {
		ngx = binary
	}

    // 从"/etc/resolv.conf"中读取dns nameservers的IP列表
	h, err := dns.GetSystemNameServers()
	...

    // 构造NGINXController对象
	n := &NGINXController{
		binary:        ngx,
		configmap:     &api_v1.ConfigMap{},
		
		// 查看 "/proc/net/if_inet6"文件是否存在,检查是否Enable IPv6
		isIPV6Enabled: isIPv6Enabled(),
		
		resolver:      h,
		proxy: &proxy{
		  
		  // 设置proxy的default server为 ”127.0.0.1:442”
			Default: &server{
				Hostname:      "localhost",
				IP:            "127.0.0.1",
				Port:          442,
				ProxyProtocol: true,
			},
		},
	}

    // 启动对 tcp/443 端口的监听
	listener, err := net.Listen("tcp", ":443")
	
	proxyList := &proxyproto.Listener{Listener: listener}

	// start goroutine that accepts tcp connections in port 443
	go func() {
		for {
			var conn net.Conn
			var err error

			if n.isProxyProtocolEnabled {
				// we need to wrap the listener in order to decode
				// proxy protocol before handling the connection
				conn, err = proxyList.Accept()
			} else {
				conn, err = listener.Accept()
			}

			if err != nil {
				glog.Warningf("unexpected error accepting tcp connection: %v", err)
				continue
			}

			glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
			go n.proxy.Handle(conn)
		}
	}()

    // onChange定义nginx.tmpl文件内容发生变化时需要进行操作:重新生成nginx template instance,并赋给NGINXController
	var onChange func()
	onChange = func() {
		template, err := ngx_template.NewTemplate(tmplPath, onChange)
		...

		n.t.Close()
		n.t = template
		glog.Info("new NGINX template loaded")
	}

    // 根据nginx.tmpl生成nginx Template,并将onChange注册进去,通过FileWatcher监听nginx.tmpl的变化时,自动调用onChange。
	ngxTpl, err := ngx_template.NewTemplate(tmplPath, onChange)
	
	n.t = ngxTpl

    // start a new NGINX master process running in foreground.
	go n.Start()
 
	return ingress.Controller(n)
}
  • 从环境变量“NGINX_BINARY”中获取nginx二进制文件的路径,如果没有该环境变量,则使用预设的默认值“/usr/sbin/nginx”

  • 从"/etc/resolv.conf"中读取dns nameservers的IP列表

  • 构造NGINXController对象

    • 查看 "/proc/net/if_inet6"文件是否存在,检查是否Enable IPv6
    • 设置proxy的default server为 ”127.0.0.1:442”
  • 启动对 tcp/443 端口的监听

  • start goroutine that accepts tcp connections in port 443

  • onChange定义nginx.tmpl文件内容发生变化时需要进行操作:重新生成nginx template instance,并赋给NGINXController

  • 根据nginx.tmpl生成nginx Template,并将onChange注册进去,通过FileWatcher监听nginx.tmpl的变化时,自动调用onChange。

  • start a new NGINX master process running in foreground.

NGINXController的定义如下:

controllers/nginx/pkg/cmd/controller/nginx.go:156

type NGINXController struct {
	t *ngx_template.Template

	configmap *api_v1.ConfigMap

	storeLister ingress.StoreLister

	binary   string
	resolver []net.IP

	cmdArgs []string

	stats        *statsCollector
	statusModule statusModule

	// returns true if IPV6 is enabled in the pod
	isIPV6Enabled bool

	// returns true if proxy protocol es enabled
	isProxyProtocolEnabled bool

	proxy *proxy
}

NewTemplate时,我们可以看到根据nginx.tmpl来生成nginx配置时,注册了funcMap,从funcMap我们可以看出nginx.tpml主要包括哪些组成部分。

//controllers/nginx/pkg/template/template.go:57
func NewTemplate(file string, onChange func()) (*Template, error) {
	tmpl, err := text_template.New("nginx.tmpl").Funcs(funcMap).ParseFiles(file)
	
	fw, err := watch.NewFileWatcher(file, onChange)

	return &Template{
		tmpl:      tmpl,
		fw:        fw,
		s:         defBufferSize,
		tmplBuf:   bytes.NewBuffer(make([]byte, 0, defBufferSize)),
		outCmdBuf: bytes.NewBuffer(make([]byte, 0, defBufferSize)),
	}, nil
}

//controllers/nginx/pkg/template/template.go:123
funcMap = text_template.FuncMap{
	"empty": func(input interface{}) bool {
		check, ok := input.(string)
		if ok {
			return len(check) == 0
		}
		return true
	},
	"buildLocation":            buildLocation,
	"buildAuthLocation":        buildAuthLocation,
	"buildAuthResponseHeaders": buildAuthResponseHeaders,
	"buildProxyPass":           buildProxyPass,
	"buildRateLimitZones":      buildRateLimitZones,
	"buildRateLimit":           buildRateLimit,
	"buildResolvers":           buildResolvers,
	"buildUpstreamName":        buildUpstreamName,
	"isLocationAllowed":        isLocationAllowed,
	"buildLogFormatUpstream":   buildLogFormatUpstream,
	"buildDenyVariable":        buildDenyVariable,
	"getenv":                   os.Getenv,
	"contains":                 strings.Contains,
	"hasPrefix":                strings.HasPrefix,
	"hasSuffix":                strings.HasSuffix,
	"toUpper":                  strings.ToUpper,
	"toLower":                  strings.ToLower,
	"formatIP":                 formatIP,
	"buildNextUpstream":        buildNextUpstream,
}

controller.NewIngressController

core/pkg/ingress/controller/launch.go:29

// NewIngressController returns a configured Ingress controller
func NewIngressController(backend ingress.Controller) *GenericController {
    
    ...	

	flags.Parse(os.Args)
	
	// 对于Ningx而言,OverrideFlags主要处理ingress-class配置,如果ingress-class没有配置,则使用默认配置"nginx",并启动Prometheus Collector for the nginx。
	backend.OverrideFlags(flags)

	flag.Set("logtostderr", "true")

    ...
    
    // 如果配置了publish-service,则要检查该serivce的svc.Status.LoadBalancer.Ingress不为空。
	if *publishSvc != "" {
        ...
        
		if len(svc.Status.LoadBalancer.Ingress) == 0 {
			// We could poll here, but we instead just exit and rely on k8s to restart us
			glog.Fatalf("service %s does not (yet) have ingress points", *publishSvc)
		}

	}
	...

    // 创建目录 "/ingress-controller/ssl", 用于保存Ingress中定义的SSL certificates,文件名格式为<namespace>-<secret name>.pem
	err = os.MkdirAll(ingress.DefaultSSLDirectory, 0655)
	
	// 构造Ingress Controller的配置
	config := &Configuration{
		UpdateStatus:            *updateStatus,
		ElectionID:              *electionID,
		Client:                  kubeClient,
		ResyncPeriod:            *resyncPeriod,
		DefaultService:          *defaultSvc,
		IngressClass:            *ingressClass,
		DefaultIngressClass:     backend.DefaultIngressClass(),
		Namespace:               *watchNamespace,
		ConfigMapName:           *configMap,
		TCPConfigMapName:        *tcpConfigMapName,
		UDPConfigMapName:        *udpConfigMapName,
		DefaultSSLCertificate:   *defSSLCertificate,
		DefaultHealthzURL:       *defHealthzURL,
		PublishService:          *publishSvc,
		Backend:                 backend,
		ForceNamespaceIsolation: *forceIsolation,
		UpdateStatusOnShutdown:  *UpdateStatusOnShutdown,
		SortBackends:            *SortBackends,
	}

    // 创建 Nginx Ingress controller。
	ic := newIngressController(config)
	
	// 注册 "/debug/pprof"等handler,方便必要时进行性能调试.
	// 注册 "/stop" handler,以必要时stop nginx-ingress-controller.
	// 注册 "/metrics" handler,提供Prometheus收集监控数据。
	go registerHandlers(*profiling, *healthzPort, ic)
	
	return ic
}
  • 对于Ningx而言,OverrideFlags主要处理ingress-class配置,如果ingress-class没有配置,则使用默认配置"nginx",并启动Prometheus Collector for the nginx。
  • 如果配置了publish-service,则要检查该serivce的svc.Status.LoadBalancer.Ingress不为空。
  • 创建目录 "/ingress-controller/ssl", 用于保存Ingress中定义的SSL certificates,文件名格式为<namespace>-<secret name>.pem
  • 构造Ingress Controller的配置
  • 创建 Nginx Ingress controller。
  • 注册 "/debug/pprof"等handler,方便必要时进行性能调试.
  • 注册 "/stop" handler,以必要时stop nginx-ingress-controller.
  • 注册 "/metrics" handler,提供Prometheus收集监控数据。
core/pkg/ingress/controller/controller.go:149

// newIngressController creates an Ingress controller
func newIngressController(config *Configuration) *GenericController {

    // 创建事件广播器
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.Infof)
	eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
		Interface: config.Client.Core().Events(config.Namespace),
	})

    // 构建GenericController
	ic := GenericController{
		cfg:             config,
		stopLock:        &sync.Mutex{},
		stopCh:          make(chan struct{}),
		
		// 设置syncRateLimiter的qps为0.3, burst为1.
		syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
		
		recorder: eventBroadcaster.NewRecorder(scheme.Scheme, api.EventSource{
			Component: "ingress-controller",
		}),
		
		// sslCertTracker holds a store of referenced Secrets in Ingress rules.
		sslCertTracker: newSSLCertTracker(),
	}

    // 创建Ingress的syncQueue,每往syncQueue插入一个Ingress对象,就会调用syncIngress一次。
	ic.syncQueue = task.NewTaskQueue(ic.syncIngress)

	// from here to the end of the method all the code is just boilerplate
	// required to watch Ingress, Secrets, ConfigMaps and Endoints.
	// This is used to detect new content, updates or removals and act accordingly
	
	// 定义Ingress Event Handler: Add, Delete, Update。
	ingEventHandler := cache.ResourceEventHandlerFuncs{
	
	   // 注册Ingress Add Event Handler.
		AddFunc: func(obj interface{}) {
			addIng := obj.(*extensions.Ingress)
			
			// 只处理Annotation ”kubernetes.io/ingress.class”满足条件的Ingress,条件必须满足其中之一:1. 如果Annotation为空,则要求--ingress-class设置的值为"nginx";2. Annotation与--ingress-class设置的值相同。
			if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
				a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
				glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
				return
			}
			ic.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
			
			// 将满足条件的Ingress Object加入到syncQueue,如此便会触发调用ic.syncIngress来update and reload nginx config。
			ic.syncQueue.Enqueue(obj)
			
			// extracts information about secrets inside the Ingress rule, 如果该Ingress中的secret不在sslCertTracker cache中,则会调用ic.syncSecret将secret内容更新到对应的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。
			ic.extractSecretNames(addIng)
		},
		
		// 注册Ingress Delete Event Handler。
		DeleteFunc: func(obj interface{}) {
			delIng := obj.(*extensions.Ingress)
			
			// 同Add一样,只处理Annotation ”kubernetes.io/ingress.class”满足条件的Ingress。
			if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
				glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
				return
			}
			ic.recorder.Eventf(delIng, api.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
			
			// 将满足条件的Ingress Object加入到syncQueue。
			ic.syncQueue.Enqueue(obj)
		},
		
		// 注册Ingress Update Event Handler。
		UpdateFunc: func(old, cur interface{}) {
			oldIng := old.(*extensions.Ingress)
			curIng := cur.(*extensions.Ingress)
			validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
			validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
			if !validOld && validCur {
				glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
				ic.recorder.Eventf(curIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
			} else if validOld && !validCur {
				glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
				ic.recorder.Eventf(curIng, api.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
			} else if validCur && !reflect.DeepEqual(old, cur) {
				ic.recorder.Eventf(curIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
			} else {
				// old and cur are invalid or old and cur doesn't have changes, so ignore
				return
			}
			
			// 将满足条件的Ingress Object加入到syncQueue。
			ic.syncQueue.Enqueue(cur)
			
			// 将Ingress中定义的Secret同步更新到对应的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。
			ic.extractSecretNames(curIng)
		},
	}


    // 定义Secret Event Handler: Add, Delete, Update。
	secrEventHandler := cache.ResourceEventHandlerFuncs{
	
	   // 注册Secret Add Event Handler。
		AddFunc: func(obj interface{}) {
			sec := obj.(*api.Secret)
			key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
			
			// checks if a secret is referenced or not by one or more Ingress rules
			if ic.secrReferenced(sec.Namespace, sec.Name) {
			
			     // 调用ic.syncSecret将secret内容更新到对应的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中
				ic.syncSecret(key)
			}
		},
		
		// 注册Secret Update Event Handler。
		UpdateFunc: func(old, cur interface{}) {
		
		  // 判断old secret与current secret内容是否相同。
			if !reflect.DeepEqual(old, cur) {
				sec := cur.(*api.Secret)
				key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
				
				// 如果不同,则调用ic.syncSecret同步到pem文件中。
				ic.syncSecret(key)
			}
		},
		
		// 注册Secret Delete Event Handler。
		DeleteFunc: func(obj interface{}) {
			sec := obj.(*api.Secret)
			key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
			
			// 从sslCertTracker cache中删除对应的secret
			ic.sslCertTracker.DeleteAll(key)
		},
	}

    // 定义通用event handler(给Endpoint Object使用),对于Add/Delete/Update都将object插入到syncQueue.
	eventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			ic.syncQueue.Enqueue(obj)
		},
		DeleteFunc: func(obj interface{}) {
			ic.syncQueue.Enqueue(obj)
		},
		UpdateFunc: func(old, cur interface{}) {
			if !reflect.DeepEqual(old, cur) {
				ic.syncQueue.Enqueue(cur)
			}
		},
	}


    // 定义ConfigMap Event Handler: Add, Delete, Update。
	mapEventHandler := cache.ResourceEventHandlerFuncs{
	
	   // 注册ConfigMap Add Event Handler。
		AddFunc: func(obj interface{}) {
			upCmap := obj.(*api.ConfigMap)
			mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
			
			// 判断该ConfigMap的namespace/configMapName是都匹配--configmap的Name。
			if mapKey == ic.cfg.ConfigMapName {
				glog.V(2).Infof("adding configmap %v to backend", mapKey)
				
				// 如果匹配,则将该configmap内容设置给后端nginx,并设置reloadRequired为true(syncIngress中会判断reloadRequired是否为true,才执行对应的OnUpdate对nginx config更新并reload。)
				ic.cfg.Backend.SetConfig(upCmap)
				ic.reloadRequired = true
			}
		},
		
		// 注册ConfigMap Update Event Handler。
		UpdateFunc: func(old, cur interface{}) {
			if !reflect.DeepEqual(old, cur) {
				upCmap := cur.(*api.ConfigMap)
				mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
				
				// 判断该ConfigMap的namespace/configMapName是都匹配--configmap的Name
				if mapKey == ic.cfg.ConfigMapName {
					glog.V(2).Infof("updating configmap backend (%v)", mapKey)
					
					// 如果匹配,则将该configmap内容设置给后端nginx,并设置reloadRequired为true(syncIngress中会判断reloadRequired是否为true,才执行对应的OnUpdate对nginx config更新并reload。)
					ic.cfg.Backend.SetConfig(upCmap)
					ic.reloadRequired = true
				}
				
				// updates to configuration configmaps can trigger an update
				if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName {
					ic.recorder.Eventf(upCmap, api.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
					
					// 将更新的configmap插入到syncQueue,触发调用syncIngress。
					ic.syncQueue.Enqueue(cur)
				}
			}
		},
	}

    //  如果启用了--force-namespace-isolation,并且配置--watch-namespace不是“”(表示all namespace),则watchNs为--watch-namespace配置的值,否则其他任何情况,wathNs都为all.
    // 注意watchNs只表示对Secret和ConfigMap的watch namesapce。
	watchNs := api.NamespaceAll
	if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != api.NamespaceAll {
		watchNs = ic.cfg.Namespace
	}

    // 对ingress,endpoint,service而言,watch的namespace就是--watch-namespace配置的值。
	ic.ingLister.Store, ic.ingController = cache.NewInformer(
		cache.NewListWatchFromClient(ic.cfg.Client.Extensions().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
		&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)

	ic.endpLister.Store, ic.endpController = cache.NewInformer(
		cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
		&api.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)

	ic.secrLister.Store, ic.secrController = cache.NewInformer(
		cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "secrets", watchNs, fields.Everything()),
		&api.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)

	ic.mapLister.Store, ic.mapController = cache.NewInformer(
		cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "configmaps", watchNs, fields.Everything()),
		&api.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)

	ic.svcLister.Store, ic.svcController = cache.NewInformer(
		cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
		&api.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})

    // 对node对象而言,watch的namespace当然始终是all namespace。
	ic.nodeLister.Store, ic.nodeController = cache.NewInformer(
		cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "nodes", api.NamespaceAll, fields.Everything()),
		&api.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})

    // --update-status配置默认为true,也就是说默认会通过status.NewStatusSyncer来配置ingress controller的syncStatus接口。syncStatus用来returns a list of IP addresses and/or FQDN where the ingress controller is currently running。
	if config.UpdateStatus {
		ic.syncStatus = status.NewStatusSyncer(status.Config{
			Client:                 config.Client,
			PublishService:         ic.cfg.PublishService,
			IngressLister:          ic.ingLister,
			ElectionID:             config.ElectionID,
			IngressClass:           config.IngressClass,
			DefaultIngressClass:    config.DefaultIngressClass,
			UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
		})
	} else {
		glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)")
	}

	ic.annotations = newAnnotationExtractor(ic)

    // 将Ingress, Service, Node, Endpoint, Secret, ConfigMap的Lister赋值给nginx ingress controller的Lister。
	ic.cfg.Backend.SetListers(ingress.StoreLister{
		Ingress:   ic.ingLister,
		Service:   ic.svcLister,
		Node:      ic.nodeLister,
		Endpoint:  ic.endpLister,
		Secret:    ic.secrLister,
		ConfigMap: ic.mapLister,
	})

	return &ic
}
  • 创建事件广播器
  • 构建GenericController
  • 创建Ingress的syncQueue,每往syncQueue插入一个Ingress对象,就会调用syncIngress一次。
  • 定义Ingress Event Handler: Add, Delete, Update。
    • 注册Ingress Add Event Handler.
      • 只处理Annotation ”kubernetes.io/ingress.class”满足条件的Ingress,条件必须满足其中之一:1. 如果Annotation为空,则要求--ingress-class设置的值为"nginx";2. Annotation与--ingress-class设置的值相同。
      • 将满足条件的Ingress Object加入到syncQueue,如此便会触发调用ic.syncIngress来update and reload nginx config。
      • extracts information about secrets inside the Ingress rule, 如果该Ingress中的secret不在sslCertTracker cache中,则会调用ic.syncSecret将secret内容更新到对应的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。
    • 注册Ingress Delete Event Handler。
      • 同Add一样,只处理Annotation ”kubernetes.io/ingress.class”满足条件的Ingress。
      • 将满足条件的Ingress Object加入到syncQueue。
    • 注册Ingress Update Event Handler。
      • 将满足条件的Ingress Object加入到syncQueue。
      • 将Ingress中定义的Secret同步更新到对应的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。
  • 定义Secret Event Handler: Add, Delete, Update。
    • 注册Secret Add Event Handler。
      • checks if a secret is referenced or not by one or more Ingress rules
        • 调用ic.syncSecret将secret内容更新到对应的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中
    • 注册Secret Update Event Handler。
      • 判断old secret与current secret内容是否相同。 - 如果不同,则调用ic.syncSecret同步到pem文件中
    • 注册Secret Delete Event Handler。
      • 从sslCertTracker cache中删除对应的secret
  • 定义通用event handler(给Endpoint Object使用),对于Add/Delete/Update都将object插入到syncQueue.
  • 定义ConfigMap Event Handler: Add, Delete, Update。
    • 注册ConfigMap Add Event Handler。
      • 判断该ConfigMap的namespace/configMapName是都匹配--configmap的Name。
        • 如果匹配,则将该configmap内容设置给后端nginx,并设置reloadRequired为true(syncIngress中会判断reloadRequired是否为true,才执行对应的OnUpdate对nginx config更新并reload。)
    • 注册ConfigMap Update Event Handler。 - 判断该ConfigMap的namespace/configMapName是都匹配--configmap的Name - 如果匹配,则将该configmap内容设置给后端nginx,并设置reloadRequired为true(syncIngress中会判断reloadRequired是否为true,才执行对应的OnUpdate对nginx config更新并reload。) - updates to configuration configmaps can trigger an update - 将更新的configmap插入到syncQueue,触发调用syncIngress。
  • 如果启用了--force-namespace-isolation,并且配置--watch-namespace不是“”(表示all namespace),则watchNs为--watch-namespace配置的值,否则其他任何情况,wathNs都为all.
  • 注意watchNs只表示对Secret和ConfigMap的watch namesapce。
  • 对ingress,endpoint,service而言,watch的namespace就是--watch-namespace配置的值。
  • 对node对象而言,watch的namespace当然始终是all namespace。
  • --update-status配置默认为true,也就是说默认会通过status.NewStatusSyncer来配置ingress controller的syncStatus接口。syncStatus用来returns a list of IP addresses and/or FQDN where the ingress controller is currently running。
  • 将Ingress, Service, Node, Endpoint, Secret, ConfigMap的Lister赋值给nginx ingress controller的Lister。

每次插入一个Object到ic.syncQueue,都会触发调用一次syncIngress,从上面的代码分析,可得出以下情况会往ic.syncQueue插入Object:

  • Ingress Add/Delete/Update Event Handler
  • Endpoint Add/Delete/Update Event Handler
  • ConfigMap Update Event Handler

上面也提到,syncIngress会update and reload nginx config,具体的逻辑见如下代码。

core/pkg/ingress/controller/controller.go:378

// sync collects all the pieces required to assemble the configuration file and
// then sends the content to the backend (OnUpdate) receiving the populated
// template as response reloading the backend if is required.
func (ic *GenericController) syncIngress(key interface{}) error {
	ic.syncRateLimiter.Accept()

	if ic.syncQueue.IsShuttingDown() {
		return nil
	}

    // getBackendServers returns a list of Upstream and Server to be used by the backend. An upstream can be used in multiple servers if the namespace, service name and port are the same
	upstreams, servers := ic.getBackendServers()
	var passUpstreams []*ingress.SSLPassthroughBackend

    // 构建passUpstreams部分
	for _, server := range servers {
		if !server.SSLPassthrough {
			continue
		}

		for _, loc := range server.Locations {
			if loc.Path != rootLocation {
				glog.Warningf("ignoring path %v of ssl passthrough host %v", loc.Path, server.Hostname)
				continue
			}
			passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
				Backend:  loc.Backend,
				Hostname: server.Hostname,
				Service:  loc.Service,
				Port:     loc.Port,
			})
			break
		}
	}

    // 构建当前nginx config。
	pcfg := ingress.Configuration{
		Backends:            upstreams,
		Servers:             servers,
		TCPEndpoints:        ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
		UDPEndpoints:        ic.getStreamServices(ic.cfg.UDPConfigMapName, api.ProtocolUDP),
		PassthroughBackends: passUpstreams,
	}

    // 如果ic.reloadRequired为false,并且nginx config内容不变,则跳过nginx reload,流程结束。
	if !ic.reloadRequired && (ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg)) {
		glog.V(3).Infof("skipping backend reload (no changes detected)")
		return nil
	}

	glog.Infof("backend reload required")

    // 否则,调用NGINXController.OnUpdate对nginx config进行reload.
	err := ic.cfg.Backend.OnUpdate(pcfg)
	if err != nil {
		incReloadErrorCount()
		glog.Errorf("unexpected failure restarting the backend: \n%v", err)
		return err
	}

	ic.reloadRequired = false
	glog.Infof("ingress backend successfully reloaded...")
	incReloadCount()
	setSSLExpireTime(servers)

    // 更新ic.runningConfig
	ic.runningConfig = &pcfg

	return nil
}
  • getBackendServers returns a list of Upstream and Server to be used by the backend. An upstream can be used in multiple servers if the namespace,
  • 构建passUpstreams部分
  • 构建当前nginx config。
  • 如果ic.reloadRequired为false,并且nginx config内容不变,则跳过nginx reload,流程结束。
  • 否则,调用NGINXController.OnUpdate对nginx config进行reload.
  • 更新ic.runningConfig

从前面的代码分析,可得出以下情况会调用ic.syncSecret将secret内容更新到对应的"/ingress-controller/ssl/<namespace>-<secret name>.pem"中。

  • Ingress Add Event Handler
  • Secret Add/Update Event Handler

ic.syncSecret具体的逻辑见如下代码。

core/pkg/ingress/controller/backend_ssl.go:38

// syncSecret keeps in sync Secrets used by Ingress rules with the files on
// disk to allow copy of the content of the secret to disk to be used
// by external processes.
func (ic *GenericController) syncSecret(key string) {
	glog.V(3).Infof("starting syncing of secret %v", key)

	var cert *ingress.SSLCert
	var err error

    // 调用ic.getPemCertificate receives a secret, and creates a ingress.SSLCert as return. It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
	cert, err = ic.getPemCertificate(key)
	if err != nil {
		glog.Warningf("error obtaining PEM from secret %v: %v", key, err)
		return
	}

	// create certificates and add or update the item in the store(sslCertTracker)
	cur, exists := ic.sslCertTracker.Get(key)
	if exists {
		s := cur.(*ingress.SSLCert)
		if reflect.DeepEqual(s, cert) {
			// no need to update
			return
		}
		glog.Infof("updating secret %v in the local store", key)
		ic.sslCertTracker.Update(key, cert)
		ic.reloadRequired = true
		return
	}

	glog.Infof("adding secret %v to the local store", key)
	ic.sslCertTracker.Add(key, cert)
	ic.reloadRequired = true
}
  • 调用ic.getPemCertificate receives a secret, and creates a ingress.SSLCert as return. It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
  • create certificates and add or update the item in the store(sslCertTracker)
  • 设置 ic.reloadRequired = true

在newIngressController中通过NewStatusSyncer给ingress Controller创建了一个StatusSyncer。 --update-status配置默认为true,也就是说默认会通过status.NewStatusSyncer来配置ingress controller的syncStatus接口。syncStatus用来returns a list of IP addresses and/or FQDN where the ingress controller is currently running。

core/pkg/ingress/status/status.go:186

// NewStatusSyncer returns a new Sync instance
func NewStatusSyncer(config Config) Sync {
	pod, err := k8s.GetPodDetails(config.Client)
	if err != nil {
		glog.Fatalf("unexpected error obtaining pod information: %v", err)
	}

	st := statusSync{
		pod:     pod,
		runLock: &sync.Mutex{},
		Config:  config,
	}
	
	// 每往st.syncQueue中插入一个Object,都会触发调用st.sync。
	st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)

	// we need to use the defined ingress class to allow multiple leaders
	// in order to update information about ingress status
	id := fmt.Sprintf("%v-%v", config.ElectionID, config.DefaultIngressClass)
	if config.IngressClass != "" {
		id = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass)
	}

    // 选举leader
	le, err := NewElection(id,
		pod.Name, pod.Namespace, 30*time.Second,
		st.callback, config.Client)
	if err != nil {
		glog.Fatalf("unexpected error starting leader election: %v", err)
	}
	st.elector = le
	return st
}


core/pkg/ingress/status/status.go:146
func (s *statusSync) sync(key interface{}) error {
	s.runLock.Lock()
	defer s.runLock.Unlock()

	if s.syncQueue.IsShuttingDown() {
		glog.V(2).Infof("skipping Ingress status update (shutting down in progress)")
		return nil
	}

    // skipping Ingress status update (I am not the current leader)
	if !s.elector.IsLeader() {
		glog.V(2).Infof("skipping Ingress status update (I am not the current leader)")
		return nil
	}

    // runningAddresses returns a list of IP addresses and/or FQDN where the ingress controller is currently running.
	addrs, err := s.runningAddresses()
	if err != nil {
		return err
	}
	
	// 将新Ip更新到对应的ingress.status中(Ingress.Status.LoadBalancer.Ingress = newIPs)
	s.updateStatus(sliceToStatus(addrs))

	return nil
}
core/pkg/ingress/status/status.go:218

// runningAddresses returns a list of IP addresses and/or FQDN where the
// ingress controller is currently running
func (s *statusSync) runningAddresses() ([]string, error) {

    // 如果配置了PublishService,则获取PublishService的`svc.Status.LoadBalancer.Ingress`作为currently running ingress controller的IP/FQDN
	if s.PublishService != "" {
		ns, name, _ := k8s.ParseNameNS(s.PublishService)
		svc, err := s.Client.Core().Services(ns).Get(name, meta_v1.GetOptions{})
		if err != nil {
			return nil, err
		}

		addrs := []string{}
		for _, ip := range svc.Status.LoadBalancer.Ingress {
			if ip.IP == "" {
				addrs = append(addrs, ip.Hostname)
			} else {
				addrs = append(addrs, ip.IP)
			}
		}

		return addrs, nil
	}

	// get information about all the pods running the ingress controller
	pods, err := s.Client.Core().Pods(s.pod.Namespace).List(meta_v1.ListOptions{
		LabelSelector: labels.SelectorFromSet(s.pod.Labels).String(),
	})
	if err != nil {
		return nil, err
	}

    // 如果没有配置PublishService,则返回各个pod所在的node IP组成的Array。
	addrs := []string{}
	for _, pod := range pods.Items {
		name := k8s.GetNodeIP(s.Client, pod.Spec.NodeName)
		if !strings.StringInSlice(name, addrs) {
			addrs = append(addrs, name)
		}
	}
	return addrs, nil
}

StatusSyncer的逻辑为:

  • 每往st.syncQueue中插入一个Object,都会触发调用st.sync。sync的流程如下:
    • skipping Ingress status update (I am not the current leader)
    • runningAddresses returns a list of IP addresses and/or FQDN where the ingress controller is currently running. runningAddresses的流程如下:
      • 如果配置了PublishService,则获取PublishService的svc.Status.LoadBalancer.Ingress作为currently running ingress controller的IP/FQDN
      • 如果没有配置PublishService,则返回各个pod所在的node IP组成的Array。
    • 将新Ip更新到对应的ingress.status中(Ingress.Status.LoadBalancer.Ingress = newIPs)
  • 选举leader

GenericController.Start

core/pkg/ingress/controller/controller.go:1237

// Start starts the Ingress controller.
func (ic GenericController) Start() {
	glog.Infof("starting Ingress controller")

    // 分别启动goruntine对ingress,endpoint,service,node,secret,configmap进行listwatch。
	go ic.ingController.Run(ic.stopCh)
	go ic.endpController.Run(ic.stopCh)
	go ic.svcController.Run(ic.stopCh)
	go ic.nodeController.Run(ic.stopCh)
	go ic.secrController.Run(ic.stopCh)
	go ic.mapController.Run(ic.stopCh)

	// Wait for all involved caches to be synced, before processing items from the queue is started
	if !cache.WaitForCacheSync(ic.stopCh,
		ic.ingController.HasSynced,
		ic.svcController.HasSynced,
		ic.endpController.HasSynced,
		ic.secrController.HasSynced,
		ic.mapController.HasSynced,
		ic.nodeController.HasSynced,
	) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
	}

    // 启动goruntine,每次执行完worker(在newIngressController时通过ic.syncQueue = task.NewTaskQueue(ic.syncIngress)注册worker为ic.syncIngress),等待10s,再次执行worker,如此循环,直到收到stopCh
	go ic.syncQueue.Run(10*time.Second, ic.stopCh)

    // 如果配置了--update-status(默认为true),则启动goruntine执行statusSync.Run,starts the loop to keep the status in sync.
	if ic.syncStatus != nil {
		go ic.syncStatus.Run(ic.stopCh)
	}

	<-ic.stopCh
}
  • 分别启动goruntine对ingress,endpoint,service,node,secret,configmap进行listwatch。
  • Wait for all involved caches to be synced, before processing items from the queue is started.
  • 启动goruntine,每次执行完worker(在newIngressController时通过ic.syncQueue = task.NewTaskQueue(ic.syncIngress)注册worker为ic.syncIngress),等待10s,再次执行worker,如此循环,直到收到stopCh.
  • 关于ic.syncIngress的代码分析在前面小节中已经分析过了。
  • 如果配置了--update-status(默认为true),则启动goruntine执行statusSync.Run,starts the loop to keep the status in sync.

对于statysSync.Run的代码,我们有必要进一步分析:

core/pkg/ingress/status/status.go:86

func (s statusSync) Run(stopCh <-chan struct{}) {

    // 启动goruntine starts the leader election loop。
	go wait.Forever(s.elector.Run, 0)
	
	// 启动goruntine,每隔30s往statusSync.syncQueue中插入dummy object,强制触发ingress status sync。
	go s.run()

    // 启动goruntine,每次执行完worker(在NewStatusSyncer时通过st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)注册worker为st.sync),等待1s,再次执行worker,如此循环,直到收到stopCh。
	go s.syncQueue.Run(time.Second, stopCh)

	<-stopCh
}


core/pkg/ingress/status/status.go:132
func (s *statusSync) run() {
	err := wait.PollInfinite(updateInterval, func() (bool, error) {
		if s.syncQueue.IsShuttingDown() {
			return true, nil
		}
		// send a dummy object to the queue to force a sync
		s.syncQueue.Enqueue("dummy")
		return false, nil
	})
	if err != nil {
		glog.Errorf("error waiting shutdown: %v", err)
	}
}
  • 启动goruntine starts the leader election loop。
  • 启动goruntine,每隔30s往statusSync.syncQueue中插入dummy object,强制触发ingress status sync。
  • 启动goruntine,每次执行完worker(在NewStatusSyncer时通过st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)注册worker为st.sync),等待1s,再次执行worker,如此循环,直到收到stopCh。
  • 对于st.sync的源码分析在前面小节中已经分析过了。

© 著作权归作者所有

共有 人打赏支持
WaltonWang
粉丝 161
博文 92
码字总数 187622
作品 0
深圳
程序员
Kubernetes Ingress 模块初探

Ingress是Kubernetes中用于对Service进行公网负载均衡的一个模块,要使用ingress资源,必须有对应的ingress controller来管控ingress。业务需要实现定制化的ingress controller,将ingress资...

bellengao
06/24
0
0
玩转Kubebetes TCP Ingress

问题 在Kuberetes应用中,一般都是通过Ingress来暴露HTTP/HTTPS的服务。但是在实际应用中,还是有不少应用是TCP长连接的,这个是否也是可以通过Ingress来暴露呢?大家知道Kubernetes社区默认...

了哥-duff
06/21
0
0
在阿里云容器服务上通过Helm部署Ingress Controller

在 Kubernetes Ingress 高可靠部署最佳实践 中介绍了在Kubernetes集群中如何部署一套高可靠的Ingress接入层,文中通过直接修改YAML的方式来完成,今天主要分享下如何通过Helm的方式在阿里云容...

chenqz
04/22
0
0
K8s反向代理负载均衡组件ingress

K8s反向代理负载均衡组件ingress 参考文档 https://github.com/kubernetes/ingress/tree/master/examples https://mritd.me/2017/03/04/how-to-use-nginx-ingress/ http://www.dockerinfo.n......

minminmsn
2017/03/08
0
0
kubernetes之Ingress部署

1,如何访问K8S中的服务: 1,Ingress介绍 Kubernetes 暴露服务的方式目前只有三种:LoadBlancer Service、NodePort Service、Ingress;前两种估计都应该很熟悉,下面详细的了解下这个 Ingr...

Flywithmeto
07/02
0
0
Kubernetes负载均衡器-Nginx ingress安装

安装Nginx ingress Nginx ingress 使用ConfigMap来管理Nginx配置,nginx是大家熟知的代理和负载均衡软件,比起Traefik来说功能更加强大. 我们使用helm来部署,chart保存在私有的仓库中,请确...

openthings
04/10
0
0
从零开始搭建Kubernetes集群(五、搭建K8S Ingress)

一、前言 上一文《从零开始搭建Kubernetes集群(四、搭建K8S Dashboard)》介绍了如何搭建Dashboard。本篇将介绍如何搭建Ingress来访问K8S集群的Service。 二、Ingress简介 Ingress是个什么鬼...

宅楠军
05/14
0
0
手把手教你构建 Kubernetes 1.8 + Flannel 网络(三)

关于 Kubernetes 1.8 + Flannel 的分享今天就完成了,需要完整资料和代码(包括所需的包)的看文章底部的领取方式。 六.Docker-registry web 私有仓库构建 环境说明: 我们选取 master 192.1...

51reboot
01/05
0
0
使用ExternalDNS自动化DNS配置

Kubernetes社区的生态繁荣和该领域技术的快速茁壮发展,已经是众所周知。Kubernetes领域有太多强大的、创新的技术产品,而最近引起我注意的项目是ExternalDNS。这是在近期的POC期间客户主动咨...

RancherLabs
昨天
0
0
升级到Kubernetes1.8.4的配置细节差异以及k8s几个不常见的坑

kubernetes已经发布了1.8,今天需要在一个新机房部署k8s环境,于是决定尝试最新版本1.8,部署过程中故意和以前的部署步骤有些不同,故而出现了一些问题,并且发现k8s这有个新版本本身几个差异...

cleverfoxloving
2017/11/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

实现异步有哪些方法

有哪些方法可以实现异步呢? 方式一:java 线程池 示例: @Test public final void test_ThreadPool() throws InterruptedException { ScheduledThreadPoolExecutor scheduledThre......

黄威
今天
0
0
linux服务器修改mtu值优化cpu

一、jumbo frames 相关 1、什么是jumbo frames Jumbo frames 是指比标准Ethernet Frames长的frame,即比1518/1522 bit大的frames,Jumbo frame的大小是每个设备厂商规定的,不属于IEEE标准;...

六库科技
今天
0
0
牛客网刷题

1. 二维数组中的查找(难度:易) 题目描述 在一个二维数组中(每个一维数组的长度相同),每一行都按照从左到右递增的顺序排序,每一列都按照从上到下递增的顺序排序。请完成一个函数,输入...

大不了敲一辈子代码
今天
0
0
linux系统的任务计划、服务管理

linux任务计划cron 在linux下,有时候要在我们不在的时候执行一项命令,或启动一个脚本,可以使用任务计划cron功能。 任务计划要用crontab命令完成 选项: -u 指定某个用户,不加-u表示当前用...

黄昏残影
昨天
0
0
设计模式:单例模式

单例模式的定义是确保某个类在任何情况下都只有一个实例,并且需要提供一个全局的访问点供调用者访问该实例的一种模式。 实现以上模式基于以下必须遵守的两点: 1.构造方法私有化 2.提供一个...

人觉非常君
昨天
0
0
《Linux Perf Master》Edition 0.4 发布

在线阅读:https://riboseyim.gitbook.io/perf 在线阅读:https://www.gitbook.com/book/riboseyim/linux-perf-master/details 百度网盘【pdf、mobi、ePub】:https://pan.baidu.com/s/1C20T......

RiboseYim
昨天
1
0
conda 换源

https://mirrors.tuna.tsinghua.edu.cn/help/anaconda/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/conda config --add channels https://mir......

阿豪boy
昨天
1
0
Confluence 6 安装补丁类文件

Atlassian 支持或者 Atlassian 缺陷修复小组可能针对有一些关键问题会提供补丁来解决这些问题,但是这些问题还没有放到下一个更新版本中。这些问题将会使用 Class 类文件同时在官方 Jira bug...

honeymose
昨天
0
0
非常实用的IDEA插件之总结

1、Alibaba Java Coding Guidelines 经过247天的持续研发,阿里巴巴于10月14日在杭州云栖大会上,正式发布众所期待的《阿里巴巴Java开发规约》扫描插件!该插件由阿里巴巴P3C项目组研发。P3C...

Gibbons
昨天
1
0
Tomcat介绍,安装jdk,安装tomcat,配置Tomcat监听80端口

Tomcat介绍 Tomcat是Apache软件基金会(Apache Software Foundation)的Jakarta项目中的一个核心项目,由Apache、Sun和其他一些公司及个人共同开发而成。 java程序写的网站用tomcat+jdk来运行...

TaoXu
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部