diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index e58ce70ded7a..9b8017f0beec 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -1025,14 +1025,11 @@ acceptedBreaks:
       new: "class org.apache.iceberg.types.Types.NestedField"
       justification: "new Constructor added"
     org.apache.iceberg:iceberg-core:
-    - code: "java.method.visibilityReduced"
-      old: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::<init>()"
-      new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::<init>()"
-      justification: "Deprecations for 1.6.0 release"
     - code: "java.element.noLongerDeprecated"
       old: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::<init>()"
       new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::<init>()"
-      justification: "Constructor became private as part of deprecations cleanup for 1.6.0 release"
+      justification: "Constructor became private as part of deprecations cleanup for\
+        \ 1.6.0 release"
     - code: "java.element.noLongerDeprecated"
       old: "method void org.apache.iceberg.rest.auth.OAuth2Util.AuthSession::<init>(java.util.Map<java.lang.String,\
         \ java.lang.String>, java.lang.String, java.lang.String, java.lang.String,\
@@ -1056,6 +1053,10 @@ acceptedBreaks:
     - code: "java.method.removed"
       old: "method org.apache.iceberg.DataFiles.Builder org.apache.iceberg.DataFiles.Builder::withEqualityFieldIds(java.util.List<java.lang.Integer>)"
       justification: "Deprecations for 1.6.0 release"
+    - code: "java.method.visibilityReduced"
+      old: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::<init>()"
+      new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::<init>()"
+      justification: "Deprecations for 1.6.0 release"
   "1.6.0":
     org.apache.iceberg:iceberg-common:
     - code: "java.method.removed"
@@ -1083,6 +1084,10 @@ acceptedBreaks:
         \ java.lang.Object[]) throws java.lang.Exception"
       justification: "Reduced visibility and scoped to package"
     org.apache.iceberg:iceberg-core:
+    - code: "java.class.defaultSerializationChanged"
+      old: "class org.apache.iceberg.GenericManifestFile"
+      new: "class org.apache.iceberg.GenericManifestFile"
+      justification: "Serialization across versions is not supported"
     - code: "java.class.removed"
       old: "enum org.apache.iceberg.BaseMetastoreTableOperations.CommitStatus"
       justification: "Removing deprecated code"
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index a9c6f50e0b36..641a7a3c9aec 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -34,6 +34,7 @@
 import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -52,7 +53,8 @@ public class AllManifestsTable extends BaseMetadataTable {
   public static final Types.NestedField REF_SNAPSHOT_ID =
       Types.NestedField.required(18, "reference_snapshot_id", Types.LongType.get());
 
-  private static final Schema MANIFEST_FILE_SCHEMA =
+  @VisibleForTesting
+  static final Schema MANIFEST_FILE_SCHEMA =
       new Schema(
           Types.NestedField.required(14, "content", Types.IntegerType.get()),
           Types.NestedField.required(1, "path", Types.StringType.get()),
@@ -119,6 +121,7 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
     protected CloseableIterable<FileScanTask> doPlanFiles() {
       FileIO io = table().io();
       Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());
+      Schema dataTableSchema = table().schema();
       Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
 
       SnapshotEvaluator snapshotEvaluator =
@@ -132,7 +135,13 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
               snap -> {
                 if (snap.manifestListLocation() != null) {
                   return new ManifestListReadTask(
-                      io, schema(), specs, snap.manifestListLocation(), filter, snap.snapshotId());
+                      dataTableSchema,
+                      io,
+                      schema(),
+                      specs,
+                      snap.manifestListLocation(),
+                      filter,
+                      snap.snapshotId());
                 } else {
                   return StaticDataTask.of(
                       io.newInputFile(
@@ -149,6 +158,7 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
   }
 
   static class ManifestListReadTask implements DataTask {
+    private final Schema dataTableSchema;
     private final FileIO io;
     private final Schema schema;
     private final Map<Integer, PartitionSpec> specs;
@@ -158,12 +168,14 @@ static class ManifestListReadTask implements DataTask {
     private DataFile lazyDataFile = null;
 
     ManifestListReadTask(
+        Schema dataTableSchema,
         FileIO io,
         Schema schema,
         Map<Integer, PartitionSpec> specs,
         String manifestListLocation,
         Expression residual,
         long referenceSnapshotId) {
+      this.dataTableSchema = dataTableSchema;
       this.io = io;
       this.schema = schema;
       this.specs = specs;
@@ -244,6 +256,31 @@ public Expression residual() {
     public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
+
+    @Override
+    public Schema schema() {
+      return schema;
+    }
+
+    Schema dataTableSchema() {
+      return dataTableSchema;
+    }
+
+    FileIO io() {
+      return io;
+    }
+
+    Map<Integer, PartitionSpec> specsById() {
+      return specs;
+    }
+
+    String manifestListLocation() {
+      return manifestListLocation;
+    }
+
+    long referenceSnapshotId() {
+      return referenceSnapshotId;
+    }
   }
 
   static StaticDataTask.Row manifestFileToRow(
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTableTaskParser.java b/core/src/main/java/org/apache/iceberg/AllManifestsTableTaskParser.java
new file mode 100644
index 000000000000..24c28d95466c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTableTaskParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.util.PartitionUtil;
+
+class AllManifestsTableTaskParser {
+  private static final String DATA_TABLE_SCHEMA = "data-table-schema";
+  private static final String FILE_IO = "file-io";
+  private static final String SCHEMA = "schema";
+  private static final String SPECS = "partition-specs";
+  private static final String MANIFEST_LIST_LOCATION = "manifest-list-Location";
+  private static final String RESIDUAL = "residual-filter";
+  private static final String REFERENCE_SNAPSHOT_ID = "reference-snapshot-id";
+
+  private AllManifestsTableTaskParser() {}
+
+  static void toJson(AllManifestsTable.ManifestListReadTask task, JsonGenerator generator)
+      throws IOException {
+    Preconditions.checkArgument(task != null, "Invalid manifest task: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");
+
+    generator.writeFieldName(DATA_TABLE_SCHEMA);
+    SchemaParser.toJson(task.dataTableSchema(), generator);
+
+    generator.writeFieldName(FILE_IO);
+    FileIOParser.toJson(task.io(), generator);
+
+    generator.writeFieldName(SCHEMA);
+    SchemaParser.toJson(task.schema(), generator);
+
+    generator.writeArrayFieldStart(SPECS);
+    for (PartitionSpec spec : task.specsById().values()) {
+      PartitionSpecParser.toJson(spec, generator);
+    }
+
+    generator.writeEndArray();
+
+    generator.writeStringField(MANIFEST_LIST_LOCATION, task.manifestListLocation());
+
+    generator.writeFieldName(RESIDUAL);
+    ExpressionParser.toJson(task.residual(), generator);
+
+    generator.writeNumberField(REFERENCE_SNAPSHOT_ID, task.referenceSnapshotId());
+  }
+
+  static AllManifestsTable.ManifestListReadTask fromJson(JsonNode jsonNode) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for manifest task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for manifest task: non-object (%s)", jsonNode);
+
+    Schema dataTableSchema = SchemaParser.fromJson(JsonUtil.get(DATA_TABLE_SCHEMA, jsonNode));
+    FileIO fileIO = FileIOParser.fromJson(JsonUtil.get(FILE_IO, jsonNode), null);
+    Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));
+
+    JsonNode specsArray = JsonUtil.get(SPECS, jsonNode);
+    Preconditions.checkArgument(
+        specsArray.isArray(), "Invalid JSON node for partition specs: non-array (%s)", specsArray);
+
+    ImmutableList.Builder<PartitionSpec> specsBuilder = ImmutableList.builder();
+    for (JsonNode specNode : specsArray) {
+      PartitionSpec spec = PartitionSpecParser.fromJson(dataTableSchema, specNode);
+      specsBuilder.add(spec);
+    }
+
+    Map<Integer, PartitionSpec> specsById = PartitionUtil.indexSpecs(specsBuilder.build());
+    String manifestListLocation = JsonUtil.getString(MANIFEST_LIST_LOCATION, jsonNode);
+    Expression residualFilter = ExpressionParser.fromJson(JsonUtil.get(RESIDUAL, jsonNode));
+    long referenceSnapshotId = JsonUtil.getLong(REFERENCE_SNAPSHOT_ID, jsonNode);
+
+    return new AllManifestsTable.ManifestListReadTask(
+        dataTableSchema,
+        fileIO,
+        schema,
+        specsById,
+        manifestListLocation,
+        residualFilter,
+        referenceSnapshotId);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
index 526bb42ea687..ae77bff7036d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
@@ -33,7 +33,6 @@
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -92,15 +91,9 @@ static CloseableIterable<FileScanTask> planFiles(
                 evalCache.get(manifest.partitionSpecId()).eval(manifest)
                     && manifestContentEvaluator.eval(manifest));
 
-    String schemaString = SchemaParser.toJson(projectedSchema);
-    String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
-    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
-
     return CloseableIterable.transform(
         filteredManifests,
-        manifest ->
-            new ManifestReadTask(
-                table, manifest, projectedSchema, schemaString, specString, residuals));
+        manifest -> new ManifestReadTask(table, manifest, projectedSchema, filter));
   }
 
   /**
@@ -283,19 +276,29 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
     private final ManifestFile manifest;
     private final Map<Integer, PartitionSpec> specsById;
 
+    private ManifestReadTask(
+        Table table, ManifestFile manifest, Schema projection, Expression filter) {
+      this(table.schema(), table.io(), table.specs(), manifest, projection, filter);
+    }
+
     ManifestReadTask(
-        Table table,
+        Schema dataTableSchema,
+        FileIO io,
+        Map<Integer, PartitionSpec> specsById,
         ManifestFile manifest,
         Schema projection,
-        String schemaString,
-        String specString,
-        ResidualEvaluator residuals) {
-      super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
+        Expression filter) {
+      super(
+          DataFiles.fromManifest(manifest),
+          null,
+          SchemaParser.toJson(projection),
+          PartitionSpecParser.toJson(PartitionSpec.unpartitioned()),
+          ResidualEvaluator.unpartitioned(filter));
       this.projection = projection;
-      this.io = table.io();
+      this.io = io;
       this.manifest = manifest;
-      this.specsById = Maps.newHashMap(table.specs());
-      this.dataTableSchema = table.schema();
+      this.specsById = Maps.newHashMap(specsById);
+      this.dataTableSchema = dataTableSchema;
 
       Type fileProjectionType = projection.findType("data_file");
       this.fileProjection =
@@ -311,7 +314,6 @@ public long estimatedRowsCount() {
           + (long) manifest.existingFilesCount();
     }
 
-    @VisibleForTesting
     ManifestFile manifest() {
       return manifest;
     }
@@ -403,5 +405,21 @@ private MetricsUtil.ReadableMetricsStruct readableMetrics(
     public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
+
+    FileIO io() {
+      return io;
+    }
+
+    Map<Integer, PartitionSpec> specsById() {
+      return specsById;
+    }
+
+    Schema dataTableSchema() {
+      return dataTableSchema;
+    }
+
+    Schema projection() {
+      return projection;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
index 149edf950032..b71744f739c7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
@@ -29,7 +29,6 @@
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -80,16 +79,11 @@ private static CloseableIterable<FileScanTask> planFiles(
         CloseableIterable.filter(
             manifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
 
-    String schemaString = SchemaParser.toJson(projectedSchema);
-    String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
     Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
-    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
     return CloseableIterable.transform(
         filteredManifests,
-        manifest ->
-            new ManifestReadTask(
-                table, manifest, projectedSchema, schemaString, specString, residuals));
+        manifest -> new ManifestReadTask(table, manifest, projectedSchema, filter));
   }
 
   abstract static class BaseFilesTableScan extends BaseMetadataTableScan {
@@ -140,18 +134,28 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
     private final Schema dataTableSchema;
     private final Schema projection;
 
+    private ManifestReadTask(
+        Table table, ManifestFile manifest, Schema projection, Expression filter) {
+      this(table.schema(), table.io(), table.specs(), manifest, projection, filter);
+    }
+
     ManifestReadTask(
-        Table table,
+        Schema dataTableSchema,
+        FileIO io,
+        Map<Integer, PartitionSpec> specsById,
         ManifestFile manifest,
         Schema projection,
-        String schemaString,
-        String specString,
-        ResidualEvaluator residuals) {
-      super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
-      this.io = table.io();
-      this.specsById = Maps.newHashMap(table.specs());
+        Expression filter) {
+      super(
+          DataFiles.fromManifest(manifest),
+          null,
+          SchemaParser.toJson(projection),
+          PartitionSpecParser.toJson(PartitionSpec.unpartitioned()),
+          ResidualEvaluator.unpartitioned(filter));
+      this.io = io;
+      this.specsById = Maps.newHashMap(specsById);
       this.manifest = manifest;
-      this.dataTableSchema = table.schema();
+      this.dataTableSchema = dataTableSchema;
       this.projection = projection;
     }
 
@@ -240,9 +244,24 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
 
-    @VisibleForTesting
+    FileIO io() {
+      return io;
+    }
+
+    Map<Integer, PartitionSpec> specsById() {
+      return specsById;
+    }
+
     ManifestFile manifest() {
       return manifest;
     }
+
+    Schema dataTableSchema() {
+      return dataTableSchema;
+    }
+
+    Schema projection() {
+      return projection;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/FilesTableTaskParser.java b/core/src/main/java/org/apache/iceberg/FilesTableTaskParser.java
new file mode 100644
index 000000000000..365deacebf10
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/FilesTableTaskParser.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.util.PartitionUtil;
+
+class FilesTableTaskParser {
+  private static final String FILE_IO = "file-io";
+  private static final String SPECS = "partition-specs";
+  private static final String SCHEMA = "schema";
+  private static final String PROJECTION = "projection";
+  private static final String RESIDUAL = "residual-filter";
+  private static final String MANIFEST = "manifest-file";
+
+  private FilesTableTaskParser() {}
+
+  static void toJson(BaseFilesTable.ManifestReadTask task, JsonGenerator generator)
+      throws IOException {
+    Preconditions.checkArgument(task != null, "Invalid files task: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");
+
+    generator.writeFieldName(SCHEMA);
+    SchemaParser.toJson(task.dataTableSchema(), generator);
+
+    generator.writeFieldName(PROJECTION);
+    SchemaParser.toJson(task.projection(), generator);
+
+    generator.writeFieldName(FILE_IO);
+    FileIOParser.toJson(task.io(), generator);
+
+    generator.writeArrayFieldStart(SPECS);
+    for (PartitionSpec spec : task.specsById().values()) {
+      PartitionSpecParser.toJson(spec, generator);
+    }
+
+    generator.writeEndArray();
+
+    generator.writeFieldName(RESIDUAL);
+    ExpressionParser.toJson(task.residual(), generator);
+
+    generator.writeFieldName(MANIFEST);
+    ManifestFileParser.toJson(task.manifest(), generator);
+  }
+
+  static BaseFilesTable.ManifestReadTask fromJson(JsonNode jsonNode) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for files task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for files task: non-object (%s)", jsonNode);
+
+    Schema dataTableSchema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));
+    Schema projection = SchemaParser.fromJson(JsonUtil.get(PROJECTION, jsonNode));
+
+    FileIO fileIO = FileIOParser.fromJson(JsonUtil.get(FILE_IO, jsonNode), null);
+
+    JsonNode specsArray = JsonUtil.get(SPECS, jsonNode);
+    Preconditions.checkArgument(
+        specsArray.isArray(), "Invalid JSON node for partition specs: non-array (%s)", specsArray);
+
+    ImmutableList.Builder<PartitionSpec> specsBuilder = ImmutableList.builder();
+    for (JsonNode specNode : specsArray) {
+      PartitionSpec spec = PartitionSpecParser.fromJson(dataTableSchema, specNode);
+      specsBuilder.add(spec);
+    }
+
+    Map<Integer, PartitionSpec> specsById = PartitionUtil.indexSpecs(specsBuilder.build());
+    Expression residualFilter = ExpressionParser.fromJson(JsonUtil.get(RESIDUAL, jsonNode));
+    ManifestFile manifestFile = ManifestFileParser.fromJson(JsonUtil.get(MANIFEST, jsonNode));
+
+    return new BaseFilesTable.ManifestReadTask(
+        dataTableSchema, fileIO, specsById, manifestFile, projection, residualFilter);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
index ed94da5e1791..d081e0bdd568 100644
--- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
@@ -105,6 +105,42 @@ public GenericManifestFile(Schema avroSchema) {
     this.keyMetadata = null;
   }
 
+  /** Adjust the arg order to avoid conflict with the public constructor below */
+  GenericManifestFile(
+      String path,
+      long length,
+      int specId,
+      ManifestContent content,
+      long sequenceNumber,
+      long minSequenceNumber,
+      Long snapshotId,
+      List<PartitionFieldSummary> partitions,
+      ByteBuffer keyMetadata,
+      Integer addedFilesCount,
+      Long addedRowsCount,
+      Integer existingFilesCount,
+      Long existingRowsCount,
+      Integer deletedFilesCount,
+      Long deletedRowsCount) {
+    this.avroSchema = AVRO_SCHEMA;
+    this.manifestPath = path;
+    this.length = length;
+    this.specId = specId;
+    this.content = content;
+    this.sequenceNumber = sequenceNumber;
+    this.minSequenceNumber = minSequenceNumber;
+    this.snapshotId = snapshotId;
+    this.addedFilesCount = addedFilesCount;
+    this.addedRowsCount = addedRowsCount;
+    this.existingFilesCount = existingFilesCount;
+    this.existingRowsCount = existingRowsCount;
+    this.deletedFilesCount = deletedFilesCount;
+    this.deletedRowsCount = deletedRowsCount;
+    this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]);
+    this.fromProjectionPos = null;
+    this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
+  }
+
   public GenericManifestFile(
       String path,
       long length,
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTableTaskParser.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTableTaskParser.java
new file mode 100644
index 000000000000..37f84d09ae91
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTableTaskParser.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.util.PartitionUtil;
+
+class ManifestEntriesTableTaskParser {
+  private static final String FILE_IO = "file-io";
+  private static final String SPECS = "partition-specs";
+  private static final String SCHEMA = "schema";
+  private static final String PROJECTION = "projection";
+  private static final String RESIDUAL = "residual-filter";
+  private static final String MANIFEST = "manifest-file";
+
+  private ManifestEntriesTableTaskParser() {}
+
+  static void toJson(BaseEntriesTable.ManifestReadTask task, JsonGenerator generator)
+      throws IOException {
+    Preconditions.checkArgument(task != null, "Invalid manifest task: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");
+
+    generator.writeFieldName(SCHEMA);
+    SchemaParser.toJson(task.dataTableSchema(), generator);
+
+    generator.writeFieldName(FILE_IO);
+    FileIOParser.toJson(task.io(), generator);
+
+    generator.writeArrayFieldStart(SPECS);
+    for (PartitionSpec spec : task.specsById().values()) {
+      PartitionSpecParser.toJson(spec, generator);
+    }
+
+    generator.writeEndArray();
+
+    generator.writeFieldName(MANIFEST);
+    ManifestFileParser.toJson(task.manifest(), generator);
+
+    generator.writeFieldName(PROJECTION);
+    SchemaParser.toJson(task.projection(), generator);
+
+    generator.writeFieldName(RESIDUAL);
+    ExpressionParser.toJson(task.residual(), generator);
+  }
+
+  static BaseEntriesTable.ManifestReadTask fromJson(JsonNode jsonNode) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for manifest task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for manifest task: non-object (%s)", jsonNode);
+
+    Schema dataTableSchema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));
+    FileIO fileIO = FileIOParser.fromJson(JsonUtil.get(FILE_IO, jsonNode), null);
+
+    JsonNode specsArray = JsonUtil.get(SPECS, jsonNode);
+    Preconditions.checkArgument(
+        specsArray.isArray(), "Invalid JSON node for partition specs: non-array (%s)", specsArray);
+    ImmutableList.Builder<PartitionSpec> specsBuilder = ImmutableList.builder();
+    for (JsonNode specNode : specsArray) {
+      PartitionSpec spec = PartitionSpecParser.fromJson(dataTableSchema, specNode);
+      specsBuilder.add(spec);
+    }
+
+    Map<Integer, PartitionSpec> specsById = PartitionUtil.indexSpecs(specsBuilder.build());
+
+    ManifestFile manifestFile = ManifestFileParser.fromJson(JsonUtil.get(MANIFEST, jsonNode));
+    Schema projection = SchemaParser.fromJson(JsonUtil.get(PROJECTION, jsonNode));
+    Expression residualFilter = ExpressionParser.fromJson(JsonUtil.get(RESIDUAL, jsonNode));
+
+    return new BaseEntriesTable.ManifestReadTask(
+        dataTableSchema, fileIO, specsById, manifestFile, projection, residualFilter);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFileParser.java b/core/src/main/java/org/apache/iceberg/ManifestFileParser.java
new file mode 100644
index 000000000000..80f1a0fcf7fc
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestFileParser.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.JsonUtil;
+
+class ManifestFileParser {
+  private static final String PATH = "path";
+  private static final String LENGTH = "length";
+  private static final String SPEC_ID = "partition-spec-id";
+  private static final String CONTENT = "content";
+  private static final String SEQUENCE_NUMBER = "sequence-number";
+  private static final String MIN_SEQUENCE_NUMBER = "min-sequence-number";
+  private static final String ADDED_SNAPSHOT_ID = "added-snapshot-id";
+  private static final String ADDED_FILES_COUNT = "added-files-count";
+  private static final String EXISTING_FILES_COUNT = "existing-files-count";
+  private static final String DELETED_FILES_COUNT = "deleted-files-count";
+  private static final String ADDED_ROWS_COUNT = "added-rows-count";
+  private static final String EXISTING_ROWS_COUNT = "existing-rows-count";
+  private static final String DELETED_ROWS_COUNT = "deleted-rows-count";
+  private static final String PARTITION_FIELD_SUMMARY = "partition-field-summary";
+  private static final String KEY_METADATA = "key-metadata";
+
+  private ManifestFileParser() {}
+
+  static void toJson(ManifestFile manifestFile, JsonGenerator generator) throws IOException {
+    Preconditions.checkArgument(manifestFile != null, "Invalid manifest file: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");
+
+    generator.writeStartObject();
+
+    generator.writeStringField(PATH, manifestFile.path());
+    generator.writeNumberField(LENGTH, manifestFile.length());
+    generator.writeNumberField(SPEC_ID, manifestFile.partitionSpecId());
+
+    if (manifestFile.content() != null) {
+      generator.writeNumberField(CONTENT, manifestFile.content().id());
+    }
+
+    generator.writeNumberField(SEQUENCE_NUMBER, manifestFile.sequenceNumber());
+    generator.writeNumberField(MIN_SEQUENCE_NUMBER, manifestFile.minSequenceNumber());
+
+    if (manifestFile.snapshotId() != null) {
+      generator.writeNumberField(ADDED_SNAPSHOT_ID, manifestFile.snapshotId());
+    }
+
+    if (manifestFile.addedFilesCount() != null) {
+      generator.writeNumberField(ADDED_FILES_COUNT, manifestFile.addedFilesCount());
+    }
+
+    if (manifestFile.existingFilesCount() != null) {
+      generator.writeNumberField(EXISTING_FILES_COUNT, manifestFile.existingFilesCount());
+    }
+
+    if (manifestFile.deletedFilesCount() != null) {
+      generator.writeNumberField(DELETED_FILES_COUNT, manifestFile.deletedFilesCount());
+    }
+
+    if (manifestFile.addedRowsCount() != null) {
+      generator.writeNumberField(ADDED_ROWS_COUNT, manifestFile.addedRowsCount());
+    }
+
+    if (manifestFile.existingRowsCount() != null) {
+      generator.writeNumberField(EXISTING_ROWS_COUNT, manifestFile.existingRowsCount());
+    }
+
+    if (manifestFile.deletedRowsCount() != null) {
+      generator.writeNumberField(DELETED_ROWS_COUNT, manifestFile.deletedRowsCount());
+    }
+
+    if (manifestFile.partitions() != null) {
+      generator.writeArrayFieldStart(PARTITION_FIELD_SUMMARY);
+      for (ManifestFile.PartitionFieldSummary summary : manifestFile.partitions()) {
+        PartitionFieldSummaryParser.toJson(summary, generator);
+      }
+
+      generator.writeEndArray();
+    }
+
+    if (manifestFile.keyMetadata() != null) {
+      generator.writeFieldName(KEY_METADATA);
+      SingleValueParser.toJson(DataFile.KEY_METADATA.type(), manifestFile.keyMetadata(), generator);
+    }
+
+    generator.writeEndObject();
+  }
+
+  static ManifestFile fromJson(JsonNode jsonNode) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for manifest file: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for manifest file: non-object (%s)", jsonNode);
+
+    String path = JsonUtil.getString(PATH, jsonNode);
+    long length = JsonUtil.getLong(LENGTH, jsonNode);
+    int specId = JsonUtil.getInt(SPEC_ID, jsonNode);
+
+    ManifestContent manifestContent = null;
+    if (jsonNode.has(CONTENT)) {
+      manifestContent = ManifestContent.fromId(JsonUtil.getInt(CONTENT, jsonNode));
+    }
+
+    long sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, jsonNode);
+    long minSequenceNumber = JsonUtil.getLong(MIN_SEQUENCE_NUMBER, jsonNode);
+
+    Long addedSnapshotId = null;
+    if (jsonNode.has(ADDED_SNAPSHOT_ID)) {
+      addedSnapshotId = JsonUtil.getLong(ADDED_SNAPSHOT_ID, jsonNode);
+    }
+
+    Integer addedFilesCount = null;
+    if (jsonNode.has(ADDED_FILES_COUNT)) {
+      addedFilesCount = JsonUtil.getInt(ADDED_FILES_COUNT, jsonNode);
+    }
+
+    Integer existingFilesCount = null;
+    if (jsonNode.has(EXISTING_FILES_COUNT)) {
+      existingFilesCount = JsonUtil.getInt(EXISTING_FILES_COUNT, jsonNode);
+    }
+
+    Integer deletedFilesCount = null;
+    if (jsonNode.has(DELETED_FILES_COUNT)) {
+      deletedFilesCount = JsonUtil.getInt(DELETED_FILES_COUNT, jsonNode);
+    }
+
+    Long addedRowsCount = null;
+    if (jsonNode.has(ADDED_ROWS_COUNT)) {
+      addedRowsCount = JsonUtil.getLong(ADDED_ROWS_COUNT, jsonNode);
+    }
+
+    Long existingRowsCount = null;
+    if (jsonNode.has(EXISTING_ROWS_COUNT)) {
+      existingRowsCount = JsonUtil.getLong(EXISTING_ROWS_COUNT, jsonNode);
+    }
+
+    Long deletedRowsCount = null;
+    if (jsonNode.has(DELETED_ROWS_COUNT)) {
+      deletedRowsCount = JsonUtil.getLong(DELETED_ROWS_COUNT, jsonNode);
+    }
+
+    List<ManifestFile.PartitionFieldSummary> partitionFieldSummaries = null;
+    if (jsonNode.has(PARTITION_FIELD_SUMMARY)) {
+      JsonNode summaryArray = JsonUtil.get(PARTITION_FIELD_SUMMARY, jsonNode);
+      Preconditions.checkArgument(
+          summaryArray.isArray(),
+          "Invalid JSON node for partition field summaries: non-array (%s)",
+          summaryArray);
+
+      ImmutableList.Builder<ManifestFile.PartitionFieldSummary> builder = ImmutableList.builder();
+      for (JsonNode summaryNode : summaryArray) {
+        ManifestFile.PartitionFieldSummary summary =
+            PartitionFieldSummaryParser.fromJson(summaryNode);
+        builder.add(summary);
+      }
+
+      partitionFieldSummaries = builder.build();
+    }
+
+    ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode);
+
+    return new GenericManifestFile(
+        path,
+        length,
+        specId,
+        manifestContent,
+        sequenceNumber,
+        minSequenceNumber,
+        addedSnapshotId,
+        partitionFieldSummaries,
+        keyMetadata,
+        addedFilesCount,
+        addedRowsCount,
+        existingFilesCount,
+        existingRowsCount,
+        deletedFilesCount,
+        deletedRowsCount);
+  }
+
+  private static class PartitionFieldSummaryParser {
+    private static final String CONTAINS_NULL = "contains-null";
+    private static final String CONTAINS_NAN = "contains-nan";
+    private static final String LOWER_BOUND = "lower-bound";
+    private static final String UPPER_BOUND = "upper-bound";
+
+    private PartitionFieldSummaryParser() {}
+
+    static void toJson(ManifestFile.PartitionFieldSummary summary, JsonGenerator generator)
+        throws IOException {
+      Preconditions.checkArgument(summary != null, "Invalid partition field summary: null");
+      Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");
+
+      generator.writeStartObject();
+
+      generator.writeBooleanField(CONTAINS_NULL, summary.containsNull());
+
+      if (summary.containsNaN() != null) {
+        generator.writeBooleanField(CONTAINS_NAN, summary.containsNaN());
+      }
+
+      if (summary.lowerBound() != null) {
+        generator.writeFieldName(LOWER_BOUND);
+        SingleValueParser.toJson(Types.BinaryType.get(), summary.lowerBound(), generator);
+      }
+
+      if (summary.upperBound() != null) {
+        generator.writeFieldName(UPPER_BOUND);
+        SingleValueParser.toJson(Types.BinaryType.get(), summary.upperBound(), generator);
+      }
+
+      generator.writeEndObject();
+    }
+
+    static ManifestFile.PartitionFieldSummary fromJson(JsonNode jsonNode) {
+      Preconditions.checkArgument(
+          jsonNode != null, "Invalid JSON node for partition field summary: null");
+      Preconditions.checkArgument(
+          jsonNode.isObject(),
+          "Invalid JSON node for partition field summary: non-object (%s)",
+          jsonNode);
+
+      boolean containsNull = JsonUtil.getBool(CONTAINS_NULL, jsonNode);
+      Boolean containsNaN = null;
+      if (jsonNode.has(CONTAINS_NAN)) {
+        containsNaN = JsonUtil.getBool(CONTAINS_NAN, jsonNode);
+      }
+
+      ByteBuffer lowerBound = null;
+      if (jsonNode.has(LOWER_BOUND)) {
+        lowerBound =
+            (ByteBuffer)
+                SingleValueParser.fromJson(Types.BinaryType.get(), jsonNode.get(LOWER_BOUND));
+      }
+
+      ByteBuffer upperBound = null;
+      if (jsonNode.has(UPPER_BOUND)) {
+        upperBound =
+            (ByteBuffer)
+                SingleValueParser.fromJson(Types.BinaryType.get(), jsonNode.get(UPPER_BOUND));
+      }
+
+      if (containsNaN != null) {
+        return new GenericPartitionFieldSummary(containsNull, containsNaN, lowerBound, upperBound);
+      } else {
+        return new GenericPartitionFieldSummary(containsNull, lowerBound, upperBound);
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ScanTaskParser.java b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java
index 9447d0668a1f..67e44cea7d07 100644
--- a/core/src/main/java/org/apache/iceberg/ScanTaskParser.java
+++ b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java
@@ -30,7 +30,10 @@ public class ScanTaskParser {
 
   private enum TaskType {
     FILE_SCAN_TASK("file-scan-task"),
-    DATA_TASK("data-task");
+    DATA_TASK("data-task"),
+    FILES_TABLE_TASK("files-table-task"),
+    ALL_MANIFESTS_TABLE_TASK("all-manifests-table-task"),
+    MANIFEST_ENTRIES_TABLE_TASK("manifest-entries-task");
 
     private final String value;
 
@@ -45,6 +48,12 @@ public static TaskType fromTypeName(String value) {
         return FILE_SCAN_TASK;
       } else if (DATA_TASK.typeName().equalsIgnoreCase(value)) {
         return DATA_TASK;
+      } else if (FILES_TABLE_TASK.typeName().equalsIgnoreCase(value)) {
+        return FILES_TABLE_TASK;
+      } else if (ALL_MANIFESTS_TABLE_TASK.typeName().equalsIgnoreCase(value)) {
+        return ALL_MANIFESTS_TABLE_TASK;
+      } else if (MANIFEST_ENTRIES_TABLE_TASK.typeName().equalsIgnoreCase(value)) {
+        return MANIFEST_ENTRIES_TABLE_TASK;
       } else {
         throw new IllegalArgumentException("Unknown task type: " + value);
       }
@@ -74,6 +83,17 @@ private static void toJson(FileScanTask fileScanTask, JsonGenerator generator)
     if (fileScanTask instanceof StaticDataTask) {
       generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.typeName());
       DataTaskParser.toJson((StaticDataTask) fileScanTask, generator);
+    } else if (fileScanTask instanceof BaseFilesTable.ManifestReadTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.FILES_TABLE_TASK.typeName());
+      FilesTableTaskParser.toJson((BaseFilesTable.ManifestReadTask) fileScanTask, generator);
+    } else if (fileScanTask instanceof AllManifestsTable.ManifestListReadTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.ALL_MANIFESTS_TABLE_TASK.typeName());
+      AllManifestsTableTaskParser.toJson(
+          (AllManifestsTable.ManifestListReadTask) fileScanTask, generator);
+    } else if (fileScanTask instanceof BaseEntriesTable.ManifestReadTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.MANIFEST_ENTRIES_TABLE_TASK.typeName());
+      ManifestEntriesTableTaskParser.toJson(
+          (BaseEntriesTable.ManifestReadTask) fileScanTask, generator);
     } else if (fileScanTask instanceof BaseFileScanTask
         || fileScanTask instanceof BaseFileScanTask.SplitScanTask) {
       generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.typeName());
@@ -98,6 +118,12 @@ private static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) {
         return FileScanTaskParser.fromJson(jsonNode, caseSensitive);
       case DATA_TASK:
         return DataTaskParser.fromJson(jsonNode);
+      case FILES_TABLE_TASK:
+        return FilesTableTaskParser.fromJson(jsonNode);
+      case ALL_MANIFESTS_TABLE_TASK:
+        return AllManifestsTableTaskParser.fromJson(jsonNode);
+      case MANIFEST_ENTRIES_TABLE_TASK:
+        return ManifestEntriesTableTaskParser.fromJson(jsonNode);
       default:
         throw new UnsupportedOperationException("Unsupported task type: " + taskType.typeName());
     }
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 923db6bbd68f..abb2ee6cc3e9 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -43,6 +43,7 @@
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.LocationUtil;
 import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PartitionUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SerializableSupplier;
 
@@ -331,7 +332,7 @@ public String toString() {
 
     this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
     this.schemasById = indexSchemas();
-    this.specsById = indexSpecs(specs);
+    this.specsById = PartitionUtil.indexSpecs(specs);
     this.sortOrdersById = indexSortOrders(sortOrders);
     this.refs = validateRefs(currentSnapshotId, refs, snapshotsById);
     this.statisticsFiles = ImmutableList.copyOf(statisticsFiles);
@@ -810,14 +811,6 @@ private Map<Integer, Schema> indexSchemas() {
     return builder.build();
   }
 
-  private static Map<Integer, PartitionSpec> indexSpecs(List<PartitionSpec> specs) {
-    ImmutableMap.Builder<Integer, PartitionSpec> builder = ImmutableMap.builder();
-    for (PartitionSpec spec : specs) {
-      builder.put(spec.specId(), spec);
-    }
-    return builder.build();
-  }
-
   private static Map<Integer, SortOrder> indexSortOrders(List<SortOrder> sortOrders) {
     ImmutableMap.Builder<Integer, SortOrder> builder = ImmutableMap.builder();
     for (SortOrder sortOrder : sortOrders) {
@@ -1054,7 +1047,7 @@ public Builder setCurrentSchema(int schemaId) {
       this.specs =
           Lists.newArrayList(Iterables.transform(specs, spec -> updateSpecSchema(schema, spec)));
       specsById.clear();
-      specsById.putAll(indexSpecs(specs));
+      specsById.putAll(PartitionUtil.indexSpecs(specs));
 
       this.sortOrders =
           Lists.newArrayList(
diff --git a/core/src/main/java/org/apache/iceberg/io/FileIOParser.java b/core/src/main/java/org/apache/iceberg/io/FileIOParser.java
index 90ace239c2b7..2743b7b616cd 100644
--- a/core/src/main/java/org/apache/iceberg/io/FileIOParser.java
+++ b/core/src/main/java/org/apache/iceberg/io/FileIOParser.java
@@ -40,7 +40,7 @@ public static String toJson(FileIO io, boolean pretty) {
     return JsonUtil.generate(gen -> toJson(io, gen), pretty);
   }
 
-  private static void toJson(FileIO io, JsonGenerator generator) throws IOException {
+  public static void toJson(FileIO io, JsonGenerator generator) throws IOException {
     String impl = io.getClass().getName();
     Map<String, String> properties;
     try {
@@ -72,7 +72,7 @@ public static FileIO fromJson(String json, Object conf) {
     return JsonUtil.parse(json, node -> fromJson(node, conf));
   }
 
-  private static FileIO fromJson(JsonNode json, Object conf) {
+  public static FileIO fromJson(JsonNode json, Object conf) {
     Preconditions.checkArgument(json.isObject(), "Cannot parse FileIO from non-object: %s", json);
     String impl = JsonUtil.getString(FILE_IO_IMPL, json);
     Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, json);
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index 4a5f0c792baa..83735939317b 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -26,6 +26,7 @@
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
@@ -98,4 +99,13 @@ public static StructLike coercePartition(
     projection.wrap(partition);
     return projection;
   }
+
+  public static Map<Integer, PartitionSpec> indexSpecs(List<PartitionSpec> specs) {
+    ImmutableMap.Builder<Integer, PartitionSpec> builder = ImmutableMap.builder();
+    for (PartitionSpec spec : specs) {
+      builder.put(spec.specId(), spec);
+    }
+
+    return builder.build();
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestAllManifestsTableTaskParser.java b/core/src/test/java/org/apache/iceberg/TestAllManifestsTableTaskParser.java
new file mode 100644
index 000000000000..2f057d7bd5a8
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestAllManifestsTableTaskParser.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.util.PartitionUtil;
+import org.junit.jupiter.api.Test;
+
+public class TestAllManifestsTableTaskParser {
+  @Test
+  public void nullCheck() throws Exception {
+    StringWriter writer = new StringWriter();
+    JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+    assertThatThrownBy(() -> AllManifestsTableTaskParser.toJson(null, generator))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid manifest task: null");
+
+    assertThatThrownBy(() -> AllManifestsTableTaskParser.toJson(createTask(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON generator: null");
+
+    assertThatThrownBy(() -> AllManifestsTableTaskParser.fromJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON node for manifest task: null");
+  }
+
+  @Test
+  public void invalidJsonNode() throws Exception {
+    String jsonStr = "{\"str\":\"1\", \"arr\":[]}";
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode rootNode = mapper.reader().readTree(jsonStr);
+
+    assertThatThrownBy(() -> AllManifestsTableTaskParser.fromJson(rootNode.get("str")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid JSON node for manifest task: non-object ");
+
+    assertThatThrownBy(() -> AllManifestsTableTaskParser.fromJson(rootNode.get("arr")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid JSON node for manifest task: non-object ");
+  }
+
+  @Test
+  public void testParser() {
+    AllManifestsTable.ManifestListReadTask task = createTask();
+    String jsonStr = ScanTaskParser.toJson(task);
+    assertThat(jsonStr).isEqualTo(taskJson());
+    AllManifestsTable.ManifestListReadTask deserializedTask =
+        (AllManifestsTable.ManifestListReadTask) ScanTaskParser.fromJson(jsonStr, false);
+    assertTaskEquals(task, deserializedTask);
+  }
+
+  private AllManifestsTable.ManifestListReadTask createTask() {
+    Schema dataTableSchema = TestBase.SCHEMA;
+    HadoopFileIO fileIO = new HadoopFileIO();
+    fileIO.initialize(ImmutableMap.of("k1", "v1", "k2", "v2"));
+    Map<Integer, PartitionSpec> specsById =
+        PartitionUtil.indexSpecs(
+            Arrays.asList(PartitionSpec.builderFor(dataTableSchema).bucket("data", 16).build()));
+
+    return new AllManifestsTable.ManifestListReadTask(
+        dataTableSchema,
+        fileIO,
+        AllManifestsTable.MANIFEST_FILE_SCHEMA,
+        specsById,
+        "/path/manifest-list-file.avro",
+        Expressions.equal("id", 1),
+        1L);
+  }
+
+  private String taskJson() {
+    return "{\"task-type\":\"all-manifests-table-task\","
+        + "\"data-table-schema\":{\"type\":\"struct\",\"schema-id\":0,"
+        + "\"fields\":[{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"},"
+        + "{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]},"
+        + "\"file-io\":{\"io-impl\":\"org.apache.iceberg.hadoop.HadoopFileIO\","
+        + "\"properties\":{\"k1\":\"v1\",\"k2\":\"v2\"}},"
+        + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{"
+        + "\"id\":14,\"name\":\"content\",\"required\":true,\"type\":\"int\"},"
+        + "{\"id\":1,\"name\":\"path\",\"required\":true,\"type\":\"string\"},"
+        + "{\"id\":2,\"name\":\"length\",\"required\":true,\"type\":\"long\"},"
+        + "{\"id\":3,\"name\":\"partition_spec_id\",\"required\":false,\"type\":\"int\"},"
+        + "{\"id\":4,\"name\":\"added_snapshot_id\",\"required\":false,\"type\":\"long\"},"
+        + "{\"id\":5,\"name\":\"added_data_files_count\",\"required\":false,\"type\":\"int\"},"
+        + "{\"id\":6,\"name\":\"existing_data_files_count\",\"required\":false,\"type\":\"int\"},"
+        + "{\"id\":7,\"name\":\"deleted_data_files_count\",\"required\":false,\"type\":\"int\"},"
+        + "{\"id\":15,\"name\":\"added_delete_files_count\",\"required\":true,\"type\":\"int\"},"
+        + "{\"id\":16,\"name\":\"existing_delete_files_count\",\"required\":true,\"type\":\"int\"},"
+        + "{\"id\":17,\"name\":\"deleted_delete_files_count\",\"required\":true,\"type\":\"int\"},"
+        + "{\"id\":8,\"name\":\"partition_summaries\",\"required\":false,\"type\":"
+        + "{\"type\":\"list\",\"element-id\":9,\"element\":{\"type\":\"struct\",\"fields\":[{"
+        + "\"id\":10,\"name\":\"contains_null\",\"required\":true,\"type\":\"boolean\"},"
+        + "{\"id\":11,\"name\":\"contains_nan\",\"required\":true,\"type\":\"boolean\"},"
+        + "{\"id\":12,\"name\":\"lower_bound\",\"required\":false,\"type\":\"string\"},"
+        + "{\"id\":13,\"name\":\"upper_bound\",\"required\":false,\"type\":\"string\"}]},\"element-required\":true}},"
+        + "{\"id\":18,\"name\":\"reference_snapshot_id\",\"required\":true,\"type\":\"long\"}]},"
+        + "\"partition-specs\":[{\"spec-id\":0,\"fields\":[{\"name\":\"data_bucket\","
+        + "\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]}],"
+        + "\"manifest-list-Location\":\"/path/manifest-list-file.avro\","
+        + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1},"
+        + "\"reference-snapshot-id\":1}";
+  }
+
+  private void assertTaskEquals(
+      AllManifestsTable.ManifestListReadTask expected,
+      AllManifestsTable.ManifestListReadTask actual) {
+
+    HadoopFileIO expectedIO = (HadoopFileIO) expected.io();
+    HadoopFileIO actualIO = (HadoopFileIO) expected.io();
+    assertThat(actualIO.properties()).isEqualTo(expectedIO.properties());
+
+    assertThat(actual.dataTableSchema().asStruct())
+        .as("Data table schema should match")
+        .isEqualTo(expected.dataTableSchema().asStruct());
+    assertThat(actual.schema().asStruct())
+        .as("Schema should match")
+        .isEqualTo(expected.schema().asStruct());
+
+    assertThat(actual.specsById()).isEqualTo(expected.specsById());
+    assertThat(actual.manifestListLocation()).isEqualTo(expected.manifestListLocation());
+    assertThat(actual.residual().toString()).isEqualTo(expected.residual().toString());
+    assertThat(actual.referenceSnapshotId()).isEqualTo(expected.referenceSnapshotId());
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java b/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java
new file mode 100644
index 000000000000..bea60601377e
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.util.PartitionUtil;
+import org.junit.jupiter.api.Test;
+
+public class TestFilesTableTaskParser {
+  @Test
+  public void nullCheck() throws Exception {
+    StringWriter writer = new StringWriter();
+    JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+    assertThatThrownBy(() -> FilesTableTaskParser.toJson(null, generator))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid files task: null");
+
+    assertThatThrownBy(() -> FilesTableTaskParser.toJson(createTask(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON generator: null");
+
+    assertThatThrownBy(() -> FilesTableTaskParser.fromJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON node for files task: null");
+  }
+
+  @Test
+  public void invalidJsonNode() throws Exception {
+    String jsonStr = "{\"str\":\"1\", \"arr\":[]}";
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode rootNode = mapper.reader().readTree(jsonStr);
+
+    assertThatThrownBy(() -> FilesTableTaskParser.fromJson(rootNode.get("str")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid JSON node for files task: non-object ");
+
+    assertThatThrownBy(() -> FilesTableTaskParser.fromJson(rootNode.get("arr")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid JSON node for files task: non-object ");
+  }
+
+  @Test
+  public void testParser() {
+    BaseFilesTable.ManifestReadTask task = createTask();
+    String jsonStr = ScanTaskParser.toJson(task);
+    assertThat(jsonStr).isEqualTo(taskJson());
+    BaseFilesTable.ManifestReadTask deserializedTask =
+        (BaseFilesTable.ManifestReadTask) ScanTaskParser.fromJson(jsonStr, false);
+    assertTaskEquals(task, deserializedTask);
+  }
+
+  private BaseFilesTable.ManifestReadTask createTask() {
+    Schema schema = TestBase.SCHEMA;
+    HadoopFileIO fileIO = new HadoopFileIO();
+    fileIO.initialize(ImmutableMap.of("k1", "v1", "k2", "v2"));
+    Map<Integer, PartitionSpec> specsById =
+        PartitionUtil.indexSpecs(
+            Arrays.asList(PartitionSpec.builderFor(schema).bucket("data", 16).build()));
+    ManifestFile manifestFile = TestManifestFileParser.createManifestFile();
+    return new BaseFilesTable.ManifestReadTask(
+        schema, fileIO, specsById, manifestFile, schema, Expressions.equal("id", 1));
+  }
+
+  private String taskJson() {
+    return "{\"task-type\":\"files-table-task\","
+        + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{"
+        + "\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"},"
+        + "{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]},"
+        + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{"
+        + "\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"},"
+        + "{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]},"
+        + "\"file-io\":{\"io-impl\":\"org.apache.iceberg.hadoop.HadoopFileIO\","
+        + "\"properties\":{\"k1\":\"v1\",\"k2\":\"v2\"}},"
+        + "\"partition-specs\":[{\"spec-id\":0,\"fields\":[{"
+        + "\"name\":\"data_bucket\",\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]}],"
+        + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1},"
+        + "\"manifest-file\":{\"path\":\"/path/input.m0.avro\","
+        + "\"length\":5878,\"partition-spec-id\":0,\"content\":0,\"sequence-number\":1,\"min-sequence-number\":2,"
+        + "\"added-snapshot-id\":12345678901234567,"
+        + "\"added-files-count\":1,\"existing-files-count\":3,\"deleted-files-count\":0,"
+        + "\"added-rows-count\":10,\"existing-rows-count\":30,\"deleted-rows-count\":0,"
+        + "\"partition-field-summary\":[{\"contains-null\":true,\"contains-nan\":false,"
+        + "\"lower-bound\":\"0A000000\",\"upper-bound\":\"64000000\"}],\"key-metadata\":\"DB030000\"}}";
+  }
+
+  private void assertTaskEquals(
+      BaseFilesTable.ManifestReadTask expected, BaseFilesTable.ManifestReadTask actual) {
+    assertThat(actual.schema().asStruct())
+        .as("Schema should match")
+        .isEqualTo(expected.schema().asStruct());
+    assertThat(actual.projection().asStruct())
+        .as("Projected schema should match")
+        .isEqualTo(expected.projection().asStruct());
+
+    HadoopFileIO expectedIO = (HadoopFileIO) expected.io();
+    HadoopFileIO actualIO = (HadoopFileIO) expected.io();
+    assertThat(actualIO.properties()).isEqualTo(expectedIO.properties());
+
+    assertThat(actual.specsById()).isEqualTo(expected.specsById());
+    assertThat(actual.residual().toString()).isEqualTo(expected.residual().toString());
+    assertThat(actual.manifest()).isEqualTo(expected.manifest());
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java b/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java
new file mode 100644
index 000000000000..5a6e99c984f0
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.JsonUtil;
+import org.junit.jupiter.api.Test;
+
+public class TestManifestFileParser {
+  @Test
+  public void nullCheck() throws Exception {
+    StringWriter writer = new StringWriter();
+    JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+    assertThatThrownBy(() -> ManifestFileParser.toJson(null, generator))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid manifest file: null");
+
+    assertThatThrownBy(() -> ManifestFileParser.toJson(createManifestFile(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON generator: null");
+
+    assertThatThrownBy(() -> ManifestFileParser.fromJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON node for manifest file: null");
+  }
+
+  @Test
+  public void invalidJsonNode() throws Exception {
+    String jsonStr = "{\"str\":\"1\", \"arr\":[]}";
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode rootNode = mapper.reader().readTree(jsonStr);
+
+    assertThatThrownBy(() -> ManifestFileParser.fromJson(rootNode.get("str")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid JSON node for manifest file: non-object ");
+
+    assertThatThrownBy(() -> ManifestFileParser.fromJson(rootNode.get("arr")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid JSON node for manifest file: non-object ");
+  }
+
+  @Test
+  public void testParser() throws Exception {
+    ManifestFile manifest = createManifestFile();
+    String jsonStr = JsonUtil.generate(gen -> ManifestFileParser.toJson(manifest, gen), false);
+    assertThat(jsonStr).isEqualTo(manifestFileJson());
+  }
+
+  static ManifestFile createManifestFile() {
+    ByteBuffer lowerBound = Conversions.toByteBuffer(Types.IntegerType.get(), 10);
+    ByteBuffer upperBound = Conversions.toByteBuffer(Types.IntegerType.get(), 100);
+    List<ManifestFile.PartitionFieldSummary> partitionFieldSummaries =
+        Arrays.asList(new GenericPartitionFieldSummary(true, false, lowerBound, upperBound));
+    ByteBuffer keyMetadata = Conversions.toByteBuffer(Types.IntegerType.get(), 987);
+
+    return new GenericManifestFile(
+        "/path/input.m0.avro",
+        5878L,
+        0,
+        ManifestContent.DATA,
+        1L,
+        2L,
+        12345678901234567L,
+        1,
+        10L,
+        3,
+        30L,
+        0,
+        0L,
+        partitionFieldSummaries,
+        keyMetadata);
+  }
+
+  private String manifestFileJson() {
+    return "{\"path\":\"/path/input.m0.avro\","
+        + "\"length\":5878,\"partition-spec-id\":0,\"content\":0,\"sequence-number\":1,\"min-sequence-number\":2,"
+        + "\"added-snapshot-id\":12345678901234567,\"added-files-count\":1,\"existing-files-count\":3,\"deleted-files-count\":0,"
+        + "\"added-rows-count\":10,\"existing-rows-count\":30,\"deleted-rows-count\":0,"
+        + "\"partition-field-summary\":[{\"contains-null\":true,\"contains-nan\":false,"
+        + "\"lower-bound\":\"0A000000\",\"upper-bound\":\"64000000\"}],"
+        + "\"key-metadata\":\"DB030000\"}";
+  }
+}