diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc
index a8bf8fe9b7e3..0ce8708f0380 100644
--- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc
+++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc
@@ -48,3 +48,4 @@
 - Update `install` command to perform enroll before starting Elastic Agent {pull}21772[21772]
 - Update `fleet.kibana.path` from a POLICY_CHANGE {pull}21804[21804]
 - Removed `install-service.ps1` and `uninstall-service.ps1` from Windows .zip packaging {pull}21694[21694]
+- Add `priority` to `AddOrUpdate` on dynamic composable input providers communication channel {pull}22352[22352]
diff --git a/x-pack/elastic-agent/pkg/composable/controller.go b/x-pack/elastic-agent/pkg/composable/controller.go
index 50cc9b69ead1..cb629f4c7e99 100644
--- a/x-pack/elastic-agent/pkg/composable/controller.go
+++ b/x-pack/elastic-agent/pkg/composable/controller.go
@@ -214,6 +214,7 @@ func (c *contextProviderState) Current() map[string]interface{} {
 }
 
 type dynamicProviderMapping struct {
+	priority   int
 	mapping    map[string]interface{}
 	processors transpiler.Processors
 }
@@ -228,7 +229,11 @@ type dynamicProviderState struct {
 }
 
 // AddOrUpdate adds or updates the current mapping for the dynamic provider.
-func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error {
+//
+// `priority` ensures that order is maintained when adding the mapping to the current state
+// for the processor. Lower priority mappings will always be sorted before higher priority mappings
+// to ensure that matching of variables occurs on the lower priority mappings first.
+func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error {
 	var err error
 	mapping, err = cloneMap(mapping)
 	if err != nil {
@@ -252,6 +257,7 @@ func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interfa
 		return nil
 	}
 	c.mappings[id] = dynamicProviderMapping{
+		priority:   priority,
 		mapping:    mapping,
 		processors: processors,
 	}
@@ -276,14 +282,24 @@ func (c *dynamicProviderState) Mappings() []dynamicProviderMapping {
 	c.lock.RLock()
 	defer c.lock.RUnlock()
 
+	// add the mappings sorted by (priority,id)
 	mappings := make([]dynamicProviderMapping, 0)
-	ids := make([]string, 0)
-	for name := range c.mappings {
-		ids = append(ids, name)
+	priorities := make([]int, 0)
+	for _, mapping := range c.mappings {
+		priorities = addToSet(priorities, mapping.priority)
 	}
-	sort.Strings(ids)
-	for _, name := range ids {
-		mappings = append(mappings, c.mappings[name])
+	sort.Ints(priorities)
+	for _, priority := range priorities {
+		ids := make([]string, 0)
+		for name, mapping := range c.mappings {
+			if mapping.priority == priority {
+				ids = append(ids, name)
+			}
+		}
+		sort.Strings(ids)
+		for _, name := range ids {
+			mappings = append(mappings, c.mappings[name])
+		}
 	}
 	return mappings
 }
@@ -319,3 +335,12 @@ func cloneMapArray(source []map[string]interface{}) ([]map[string]interface{}, e
 	}
 	return dest, nil
 }
+
+func addToSet(set []int, i int) []int {
+	for _, j := range set {
+		if j == i {
+			return set
+		}
+	}
+	return append(set, i)
+}
diff --git a/x-pack/elastic-agent/pkg/composable/dynamic.go b/x-pack/elastic-agent/pkg/composable/dynamic.go
index 33fbaca5a527..24c272a322fa 100644
--- a/x-pack/elastic-agent/pkg/composable/dynamic.go
+++ b/x-pack/elastic-agent/pkg/composable/dynamic.go
@@ -18,7 +18,11 @@ type DynamicProviderComm interface {
 	context.Context
 
 	// AddOrUpdate updates a mapping with given ID with latest mapping and processors.
-	AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error
+	//
+	// `priority` ensures that order is maintained when adding the mapping to the current state
+	// for the processor. Lower priority mappings will always be sorted before higher priority mappings
+	// to ensure that matching of variables occurs on the lower priority mappings first.
+	AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error
 	// Remove removes a mapping by given ID.
 	Remove(id string)
 }
diff --git a/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go b/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go
index dee8b96bb88e..902ceb7c8324 100644
--- a/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go
+++ b/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go
@@ -18,6 +18,9 @@ import (
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
 )
 
+// ContainerPriority is the priority that container mappings are added to the provider.
+const ContainerPriority = 0
+
 func init() {
 	composable.Providers.AddDynamicProvider("docker", DynamicProviderBuilder)
 }
@@ -76,7 +79,7 @@ func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
 					delete(stoppers, data.container.ID)
 					return
 				}
-				comm.AddOrUpdate(data.container.ID, data.mapping, data.processors)
+				comm.AddOrUpdate(data.container.ID, ContainerPriority, data.mapping, data.processors)
 			case event := <-stopListener.Events():
 				data, err := generateData(event)
 				if err != nil {
diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go
index 200bedbe8780..1a85005df19c 100644
--- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go
+++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go
@@ -28,4 +28,5 @@ type Config struct {
 func (c *Config) InitDefaults() {
 	c.SyncPeriod = 10 * time.Minute
 	c.CleanupTimeout = 60 * time.Second
+	c.Scope = "node"
 }
diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go
index c777c8a05ce7..9cea442dc6bb 100644
--- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go
+++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go
@@ -15,6 +15,13 @@ import (
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
 )
 
+const (
+	// PodPriority is the priority that pod mappings are added to the provider.
+	PodPriority = 0
+	// ContainerPriority is the priority that container mappings are added to the provider.
+	ContainerPriority = 1
+)
+
 func init() {
 	composable.Providers.AddDynamicProvider("kubernetes", DynamicProviderBuilder)
 }
@@ -54,13 +61,13 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
 
 	// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
 	// when cluster scope is enforced.
+	p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
 	if p.config.Scope == "node" {
+		p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)
 		p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client)
 	} else {
 		p.config.Node = ""
 	}
-	p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
-	p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)
 
 	watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
 		SyncTimeout: p.config.SyncPeriod,
@@ -70,7 +77,6 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
 	if err != nil {
 		return errors.New(err, "couldn't create kubernetes watcher")
 	}
-
 	watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm})
 
 	err = watcher.Start()
@@ -104,7 +110,7 @@ func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) {
 	// Emit the pod
 	// We emit Pod + containers to ensure that configs matching Pod only
 	// get Pod metadata (not specific to any container)
-	p.comm.AddOrUpdate(string(pod.GetUID()), mapping, processors)
+	p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors)
 
 	// Emit all containers in the pod
 	p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses)
@@ -161,7 +167,7 @@ func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernet
 		}
 
 		// Emit the container
-		p.comm.AddOrUpdate(eventID, mapping, processors)
+		p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors)
 	}
 }
 
diff --git a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go
index 240b7251aa06..b8466055e9e5 100644
--- a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go
+++ b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go
@@ -14,6 +14,9 @@ import (
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
 )
 
+// ItemPriority is the priority that item mappings are added to the provider.
+const ItemPriority = 0
+
 func init() {
 	composable.Providers.AddDynamicProvider("local_dynamic", DynamicProviderBuilder)
 }
@@ -30,7 +33,7 @@ type dynamicProvider struct {
 // Run runs the environment context provider.
 func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
 	for i, item := range c.Items {
-		if err := comm.AddOrUpdate(strconv.Itoa(i), item.Mapping, item.Processors); err != nil {
+		if err := comm.AddOrUpdate(strconv.Itoa(i), ItemPriority, item.Mapping, item.Processors); err != nil {
 			return errors.New(err, fmt.Sprintf("failed to add mapping for index %d", i), errors.TypeUnexpected)
 		}
 	}
diff --git a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go
index 9e8bc7a394ce..1576d51194a5 100644
--- a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go
+++ b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go
@@ -69,11 +69,13 @@ func TestContextProvider(t *testing.T) {
 
 	curr1, ok1 := comm.Current("0")
 	assert.True(t, ok1)
+	assert.Equal(t, ItemPriority, curr1.Priority)
 	assert.Equal(t, mapping1, curr1.Mapping)
 	assert.Equal(t, processors1, curr1.Processors)
 
 	curr2, ok2 := comm.Current("1")
 	assert.True(t, ok2)
+	assert.Equal(t, ItemPriority, curr2.Priority)
 	assert.Equal(t, mapping2, curr2.Mapping)
 	assert.Equal(t, processors2, curr2.Processors)
 }
diff --git a/x-pack/elastic-agent/pkg/composable/testing/dynamic.go b/x-pack/elastic-agent/pkg/composable/testing/dynamic.go
index 3943ac4b3bb6..bfa48dff57da 100644
--- a/x-pack/elastic-agent/pkg/composable/testing/dynamic.go
+++ b/x-pack/elastic-agent/pkg/composable/testing/dynamic.go
@@ -11,6 +11,7 @@ import (
 
 // DynamicState is the state of the dynamic mapping.
 type DynamicState struct {
+	Priority   int
 	Mapping    map[string]interface{}
 	Processors []map[string]interface{}
 }
@@ -34,7 +35,7 @@ func NewDynamicComm(ctx context.Context) *DynamicComm {
 }
 
 // AddOrUpdate adds or updates a current mapping.
-func (t *DynamicComm) AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error {
+func (t *DynamicComm) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error {
 	var err error
 	mapping, err = CloneMap(mapping)
 	if err != nil {
@@ -53,6 +54,7 @@ func (t *DynamicComm) AddOrUpdate(id string, mapping map[string]interface{}, pro
 		t.previous[id] = prev
 	}
 	t.current[id] = DynamicState{
+		Priority:   priority,
 		Mapping:    mapping,
 		Processors: processors,
 	}