diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index a73b650aac2b5..c9947081b2de5 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -248,7 +248,9 @@ class ControllerServer( setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs). setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs). setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs). - setInterBrokerListenerName(config.interBrokerListenerName.value()) + setInterBrokerListenerName(config.interBrokerListenerName.value()). + setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs). + setControllerPerformanceAlwaysLogThresholdMs(config.controllerPerformanceAlwaysLogThresholdMs) } controller = controllerBuilder.build() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2734115641b10..adec8ed759563 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -241,6 +241,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val initialRegistrationTimeoutMs: Int = getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG) val brokerHeartbeatIntervalMs: Int = getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG) val brokerSessionTimeoutMs: Int = getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG) + val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS) + val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS) def requiresZookeeper: Boolean = processRoles.isEmpty def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty diff --git a/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java b/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java new file mode 100644 index 0000000000000..fbe8b1c3cbbb8 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/EventPerformanceMonitor.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.text.DecimalFormat; +import java.util.AbstractMap; +import java.util.Map; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Track the performance of controller events. Periodically log the slowest events. + * Log any event slower than a certain threshold. + */ +class EventPerformanceMonitor { + /** + * The format to use when displaying milliseconds. + */ + private static final DecimalFormat MILLISECOND_DECIMAL_FORMAT = new DecimalFormat("#0.00"); + + static class Builder { + LogContext logContext = null; + long periodNs = SECONDS.toNanos(60); + long alwaysLogThresholdNs = SECONDS.toNanos(2); + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setPeriodNs(long periodNs) { + this.periodNs = periodNs; + return this; + } + + Builder setAlwaysLogThresholdNs(long alwaysLogThresholdNs) { + this.alwaysLogThresholdNs = alwaysLogThresholdNs; + return this; + } + + EventPerformanceMonitor build() { + if (logContext == null) logContext = new LogContext(); + return new EventPerformanceMonitor(logContext, + periodNs, + alwaysLogThresholdNs); + } + } + + /** + * The log4j object to use. + */ + private final Logger log; + + /** + * The period in nanoseconds. + */ + private long periodNs; + + /** + * The always-log threshold in nanoseconds. + */ + private long alwaysLogThresholdNs; + + /** + * The name of the slowest event we've seen so far, or null if none has been seen. + */ + private String slowestEventName; + + /** + * The duration of the slowest event we've seen so far, or 0 if none has been seen. + */ + private long slowestEventDurationNs; + + /** + * The total duration of all the events we've seen. + */ + private long totalEventDurationNs; + + /** + * The number of events we've seen. + */ + private int numEvents; + + private EventPerformanceMonitor( + LogContext logContext, + long periodNs, + long alwaysLogThresholdNs + ) { + this.log = logContext.logger(EventPerformanceMonitor.class); + this.periodNs = periodNs; + this.alwaysLogThresholdNs = alwaysLogThresholdNs; + reset(); + } + + long periodNs() { + return periodNs; + } + + Map.Entry slowestEvent() { + return new AbstractMap.SimpleImmutableEntry<>(slowestEventName, slowestEventDurationNs); + } + + /** + * Reset all internal state. + */ + void reset() { + this.slowestEventName = null; + this.slowestEventDurationNs = 0; + this.totalEventDurationNs = 0; + this.numEvents = 0; + } + + /** + * Handle a controller event being finished. + * + * @param name The name of the controller event. + * @param durationNs The duration of the controller event in nanoseconds. + */ + void observeEvent(String name, long durationNs) { + String message = doObserveEvent(name, durationNs); + if (message != null) { + log.error("{}", message); + } + } + + /** + * Handle a controller event being finished. + * + * @param name The name of the controller event. + * @param durationNs The duration of the controller event in nanoseconds. + * + * @return The message to log, or null otherwise. + */ + String doObserveEvent(String name, long durationNs) { + if (slowestEventName == null || slowestEventDurationNs < durationNs) { + slowestEventName = name; + slowestEventDurationNs = durationNs; + } + totalEventDurationNs += durationNs; + numEvents++; + if (durationNs < alwaysLogThresholdNs) { + return null; + } + return "Exceptionally slow controller event " + name + " took " + + NANOSECONDS.toMillis(durationNs) + " ms."; + } + + /** + * Generate a log message summarizing the events of the last period, + * and then reset our internal state. + */ + void generatePeriodicPerformanceMessage() { + String message = periodicPerformanceMessage(); + log.info("{}", message); + reset(); + } + + /** + * Generate a log message summarizing the events of the last period. + * + * @return The summary string. + */ + String periodicPerformanceMessage() { + StringBuilder bld = new StringBuilder(); + bld.append("In the last "); + bld.append(NANOSECONDS.toMillis(periodNs)); + bld.append(" ms period, "); + if (numEvents == 0) { + bld.append("there were no controller events completed."); + } else { + bld.append(numEvents).append(" controller events were completed, which took an average of "); + bld.append(formatNsAsDecimalMs(totalEventDurationNs / numEvents)); + bld.append(" ms each. The slowest event was ").append(slowestEventName); + bld.append(", which took "); + bld.append(formatNsAsDecimalMs(slowestEventDurationNs)); + bld.append(" ms."); + } + return bld.toString(); + } + + /** + * Translate a duration in nanoseconds to a decimal duration in milliseconds. + * + * @param durationNs The duration in nanoseconds. + * @return The decimal duration in milliseconds. + */ + static String formatNsAsDecimalMs(long durationNs) { + double number = NANOSECONDS.toMicros(durationNs); + number /= 1000; + return MILLISECOND_DECIMAL_FORMAT.format(number); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 3e3b362c85b28..acf5c87f8163e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -175,16 +175,21 @@ */ public final class QuorumController implements Controller { /** - * The maximum records that the controller will write in a single batch. + * The default maximum records that the controller will write in a single batch. */ - private static final int MAX_RECORDS_PER_BATCH = 10000; + private static final int DEFAULT_MAX_RECORDS_PER_BATCH = 10000; + + /** + * The default minimum event time that can be logged as a slow event. + */ + private static final int DEFAULT_MIN_SLOW_EVENT_TIME_MS = 200; /** * The maximum records any user-initiated operation is allowed to generate. * * For now, this is set to the maximum records in a single batch. */ - static final int MAX_RECORDS_PER_USER_OP = MAX_RECORDS_PER_BATCH; + static final int MAX_RECORDS_PER_USER_OP = DEFAULT_MAX_RECORDS_PER_BATCH; /** * A builder class which creates the QuorumController. @@ -213,7 +218,9 @@ public static class Builder { private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP; private Map staticConfig = Collections.emptyMap(); private BootstrapMetadata bootstrapMetadata = null; - private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH; + private int maxRecordsPerBatch = DEFAULT_MAX_RECORDS_PER_BATCH; + private long controllerPerformanceSamplePeriodMs = 60000L; + private long controllerPerformanceAlwaysLogThresholdMs = 2000L; private DelegationTokenCache tokenCache; private String tokenSecretKeyString; private long delegationTokenMaxLifeMs; @@ -321,6 +328,16 @@ public Builder setMaxRecordsPerBatch(int maxRecordsPerBatch) { return this; } + public Builder setControllerPerformanceSamplePeriodMs(long controllerPerformanceSamplePeriodMs) { + this.controllerPerformanceSamplePeriodMs = controllerPerformanceSamplePeriodMs; + return this; + } + + public Builder setControllerPerformanceAlwaysLogThresholdMs(long controllerPerformanceAlwaysLogThresholdMs) { + this.controllerPerformanceAlwaysLogThresholdMs = controllerPerformanceAlwaysLogThresholdMs; + return this; + } + public Builder setCreateTopicPolicy(Optional createTopicPolicy) { this.createTopicPolicy = createTopicPolicy; return this; @@ -433,7 +450,9 @@ public QuorumController build() throws Exception { delegationTokenExpiryTimeMs, delegationTokenExpiryCheckIntervalMs, uncleanLeaderElectionCheckIntervalMs, - interBrokerListenerName + interBrokerListenerName, + controllerPerformanceSamplePeriodMs, + controllerPerformanceAlwaysLogThresholdMs ); } catch (Exception e) { Utils.closeQuietly(queue, "event queue"); @@ -524,6 +543,7 @@ private void handleEventEnd(String name, long startProcessingTimeNs) { long deltaNs = endProcessingTime - startProcessingTimeNs; log.debug("Processed {} in {} us", name, MICROSECONDS.convert(deltaNs, NANOSECONDS)); + performanceMonitor.observeEvent(name, deltaNs); controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs)); } @@ -536,6 +556,8 @@ private Throwable handleEventException( if (startProcessingTimeNs.isPresent()) { long endProcessingTime = time.nanoseconds(); long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); + performanceMonitor.observeEvent(name, deltaNs); + controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs)); deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, NANOSECONDS)); } else { deltaUs = OptionalLong.empty(); @@ -1446,6 +1468,11 @@ private void replay(ApiMessage message, Optional snapshotId, lon */ private final RecordRedactor recordRedactor; + /** + * Monitors the performance of controller events and generates logs about it. + */ + private final EventPerformanceMonitor performanceMonitor; + private QuorumController( FaultHandler nonFatalFaultHandler, FaultHandler fatalFaultHandler, @@ -1477,7 +1504,9 @@ private QuorumController( long delegationTokenExpiryTimeMs, long delegationTokenExpiryCheckIntervalMs, long uncleanLeaderElectionCheckIntervalMs, - String interBrokerListenerName + String interBrokerListenerName, + long controllerPerformanceSamplePeriodMs, + long controllerPerformanceAlwaysLogThresholdMs ) { this.nonFatalFaultHandler = nonFatalFaultHandler; this.fatalFaultHandler = fatalFaultHandler; @@ -1574,6 +1603,11 @@ private QuorumController( this.metaLogListener = new QuorumMetaLogListener(); this.curClaimEpoch = -1; this.recordRedactor = new RecordRedactor(configSchema); + this.performanceMonitor = new EventPerformanceMonitor.Builder(). + setLogContext(logContext). + setPeriodNs(TimeUnit.MILLISECONDS.toNanos(controllerPerformanceSamplePeriodMs)). + setAlwaysLogThresholdNs(TimeUnit.MILLISECONDS.toNanos(controllerPerformanceAlwaysLogThresholdMs)). + build(); if (maxIdleIntervalNs.isPresent()) { registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong()); } @@ -1587,7 +1621,7 @@ private QuorumController( } registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs)); registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs)); - + registerGeneratePeriodicPerformanceMessage(); // OffsetControlManager must be initialized last, because its constructor will take the // initial in-memory snapshot of all extant timeline data structures. this.offsetControl = new OffsetControlManager.Builder(). @@ -1597,7 +1631,6 @@ private QuorumController( setTime(time). build(); log.info("Creating new QuorumController with clusterId {}", clusterId); - this.raftClient.register(metaLogListener); } @@ -1681,6 +1714,21 @@ private void registerElectUnclean(long checkIntervalNs) { EnumSet.of(PeriodicTaskFlag.VERBOSE))); } + /** + * Register the generatePeriodicPerformanceMessage task. + * + * This task periodically logs some statistics about controller performance. + */ + private void registerGeneratePeriodicPerformanceMessage() { + periodicControl.registerTask(new PeriodicTask("generatePeriodicPerformanceMessage", + () -> { + performanceMonitor.generatePeriodicPerformanceMessage(); + return ControllerResult.of(Collections.emptyList(), false); + }, + performanceMonitor.periodNs(), + EnumSet.noneOf(PeriodicTaskFlag.class))); + } + /** * Register the delegation token expiration task. * diff --git a/metadata/src/test/java/org/apache/kafka/controller/EventPerformanceMonitorTest.java b/metadata/src/test/java/org/apache/kafka/controller/EventPerformanceMonitorTest.java new file mode 100644 index 0000000000000..81e01679dc0fd --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/EventPerformanceMonitorTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.junit.jupiter.api.Test; + +import java.util.AbstractMap; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class EventPerformanceMonitorTest { + @Test + public void testDefaultPeriodNs() { + assertEquals(SECONDS.toNanos(60), + new EventPerformanceMonitor.Builder().build().periodNs()); + } + + @Test + public void testSlowestEventWithNoEvents() { + EventPerformanceMonitor monitor = new EventPerformanceMonitor.Builder().build(); + assertEquals(new AbstractMap.SimpleImmutableEntry<>(null, 0L), + monitor.slowestEvent()); + } + + @Test + public void testSlowestEventWithThreeEvents() { + EventPerformanceMonitor monitor = new EventPerformanceMonitor.Builder().build(); + monitor.observeEvent("fastEvent", MILLISECONDS.toNanos(2)); + monitor.observeEvent("slowEvent", MILLISECONDS.toNanos(100)); + assertEquals(new AbstractMap.SimpleImmutableEntry<>("slowEvent", MILLISECONDS.toNanos(100)), + monitor.slowestEvent()); + } + + @Test + public void testLogSlowEvent() { + EventPerformanceMonitor monitor = new EventPerformanceMonitor.Builder().build(); + assertEquals("Exceptionally slow controller event slowEvent took 5000 ms.", + monitor.doObserveEvent("slowEvent", SECONDS.toNanos(5))); + } + + @Test + public void testDoNotLogFastEvent() { + EventPerformanceMonitor monitor = new EventPerformanceMonitor.Builder().build(); + assertNull(monitor.doObserveEvent("slowEvent", MILLISECONDS.toNanos(250))); + } + + @Test + public void testFormatNsAsDecimalMsWithZero() { + assertEquals("0.00", + EventPerformanceMonitor.formatNsAsDecimalMs(0)); + } + + @Test + public void testFormatNsAsDecimalMsWith100() { + assertEquals("100.00", + EventPerformanceMonitor.formatNsAsDecimalMs(MILLISECONDS.toNanos(100))); + } + + @Test + public void testFormatNsAsDecimalMsWith123456789() { + assertEquals("123.46", + EventPerformanceMonitor.formatNsAsDecimalMs(123456789)); + } + + @Test + public void testPeriodicPerformanceMessageWithNoEvents() { + EventPerformanceMonitor monitor = new EventPerformanceMonitor.Builder().build(); + assertEquals("In the last 60000 ms period, there were no controller events completed.", + monitor.periodicPerformanceMessage()); + } + + @Test + public void testPeriodicPerformanceMessageWithOneEvent() { + EventPerformanceMonitor monitor = new EventPerformanceMonitor.Builder().build(); + monitor.observeEvent("myEvent", MILLISECONDS.toNanos(12)); + assertEquals("In the last 60000 ms period, 1 controller events were completed, which took an " + + "average of 12.00 ms each. The slowest event was myEvent, which took 12.00 ms.", + monitor.periodicPerformanceMessage()); + } + + @Test + public void testPeriodicPerformanceMessageWithThreeEvents() { + EventPerformanceMonitor monitor = new EventPerformanceMonitor.Builder().build(); + monitor.observeEvent("myEvent", MILLISECONDS.toNanos(12)); + monitor.observeEvent("myEvent2", MILLISECONDS.toNanos(19)); + monitor.observeEvent("myEvent3", MILLISECONDS.toNanos(1)); + assertEquals("In the last 60000 ms period, 3 controller events were completed, which took an " + + "average of 10.67 ms each. The slowest event was myEvent2, which took 19.00 ms.", + monitor.periodicPerformanceMessage()); + } + + @Test + public void testGeneratePeriodicPerformanceMessageResetsState() { + EventPerformanceMonitor monitor = new EventPerformanceMonitor.Builder().build(); + monitor.observeEvent("myEvent", MILLISECONDS.toNanos(12)); + monitor.observeEvent("myEvent2", MILLISECONDS.toNanos(19)); + monitor.observeEvent("myEvent3", MILLISECONDS.toNanos(1)); + monitor.generatePeriodicPerformanceMessage(); + assertEquals("In the last 60000 ms period, there were no controller events completed.", + monitor.periodicPerformanceMessage()); + } +} diff --git a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java index b30a04b6a7961..a693471b7b3f5 100644 --- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java @@ -114,6 +114,14 @@ public class KRaftConfigs { public static final String SERVER_MAX_STARTUP_TIME_MS_DOC = "The maximum number of milliseconds we will wait for the server to come up. " + "By default there is no limit. This should be used for testing only."; + public static final String CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS = "controller.performance.sample.period.ms"; + public static final long CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DEFAULT = 60000; + public static final String CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DOC = "The number of milliseconds between periodic controller event performance log messages."; + + public static final String CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS = "controller.performance.always.log.threshold.ms"; + public static final long CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DEFAULT = 2000; + public static final String CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC = "We will log an error message about controller events that take longer than this threshold."; + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC) .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC) @@ -131,5 +139,7 @@ public class KRaftConfigs { .define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC) .define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC) .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC) + .defineInternal(CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS, LONG, CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DEFAULT, atLeast(100), MEDIUM, CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS_DOC) + .defineInternal(CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS, LONG, CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DEFAULT, atLeast(0), MEDIUM, CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC) .defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC); }