Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Variable OCCURS clause #239

Closed
tr11 opened this issue Jan 17, 2020 · 22 comments
Closed

Variable OCCURS clause #239

tr11 opened this issue Jan 17, 2020 · 22 comments
Labels
accepted Accepted for implementation bug Something isn't working

Comments

@tr11
Copy link
Collaborator

tr11 commented Jan 17, 2020

Describe the bug

Variable OCCURS fails if we don't specify variable record lengths. This may be very similar to the discussion of #156, but I think the issue there is slightly different.

To Reproduce

Run the following with pyspark

import os
import tempfile
from pyspark.sql.functions import explode

with tempfile.NamedTemporaryFile('wb') as f:
    f.write(b'   5ABC1ABC2ABC3ABC4ABC5   5DEF1DEF2DEF3DEF4DEF5')
    f.flush()

    (spark
      .read
      .format("cobol")
      .option("copybook_contents", """
          01 RECORD.
              02 COUNT PIC 9(4).
              02 GROUP OCCURS 0 TO 5 TIMES DEPENDING ON COUNT.
                  03 TEXT   PIC X(3).
                  03 FIELD  PIC 9.
      """)
      .option("encoding", 'ascii')
      .option("variable_size_occurs", "true")
      .load(f.name)
    ).select('RECORD.COUNT', explode('RECORD.GROUP')).show()

    (spark
      .read
      .format("cobol")
      .option("copybook_contents", """
          01 RECORD.
              02 COUNT PIC 9(4).
              02 GROUP OCCURS 0 TO 11 TIMES DEPENDING ON COUNT.
                  03 TEXT   PIC X(3).
                  03 FIELD  PIC 9.
      """)
      .option("encoding", 'ascii')
      .option("variable_size_occurs", "true")
      .load(f.name)
    ).select('RECORD.COUNT', explode('RECORD.GROUP')).show()    
    
    (spark
      .read
      .format("cobol")
      .option("copybook_contents", """
          01 RECORD.
              02 COUNT PIC 9(4).
              02 GROUP OCCURS 0 TO 10 TIMES DEPENDING ON COUNT.
                  03 TEXT   PIC X(3).
                  03 FIELD  PIC 9.
      """)
      .option("encoding", 'ascii')
      .option("variable_size_occurs", "true")
      .load(f.name)
    ).select('RECORD.COUNT', explode('RECORD.GROUP')).show()    

Expected behavior

All examples above should return the same

+------------+--------+
|RECORD.COUNT|     col|
+------------+--------+
|           5|[ABC, 1]|
|           5|[ABC, 2]|
|           5|[ABC, 3]|
|           5|[ABC, 4]|
|           5|[ABC, 5]|
|           5|[DEF, 1]|
|           5|[DEF, 2]|
|           5|[DEF, 3]|
|           5|[DEF, 4]|
|           5|[DEF, 5]|
+------------+--------+

but the second one gives only the first record

+------------+--------+
|RECORD.COUNT|     col|
+------------+--------+
|           5|[ABC, 1]|
|           5|[ABC, 2]|
|           5|[ABC, 3]|
|           5|[ABC, 4]|
|           5|[ABC, 5]|
+------------+--------+

and the third one fails with a file size check.

Additional context

This issue seems to be a consequence of the fact that the files I have to process do not have a leading RDW block or a field specifying record lengths. Essentially, the variable OCCURS clause means that in my case the record needs to be read in full, but then we need to backtrack the number of bytes that were not needed before reading the following record.

@tr11 tr11 added the bug Something isn't working label Jan 17, 2020
@yruslan
Copy link
Collaborator

yruslan commented Jan 18, 2020

Variable size OCCURS produces variable size records. And variable size records are only supported if records have an RDW header or a record length field.

The idea of backtracking is interesting and might be the way to overcome the limitation. At first glance, it seems doable. I will think about the details next week.

The issue to solve is related to scale processing of such files. Currently a sparse index is generated based on RDW. That allows to have a way to split a variable record length file between partitions. But the index generation happens before records are parsed so actual array sizes are unknown at that stage.

To implement the backtracking the reader needs to

  • Gather all information on the locations of array element sizes and array lengths and the ordering and nesting of arrays,
  • This info needs to be used when a sparse index is generated so that files are split properly between partitions.
  • During the record decoding stage the backtracking algorithm needs to be implemented.

@tr11
Copy link
Collaborator Author

tr11 commented Jan 18, 2020 via email

@tr11
Copy link
Collaborator Author

tr11 commented Jan 18, 2020

In fact, something like this could even simplify the code as there is no need for a distinction between fixed- and variable-length records; length would be determined by:

  1. existence of an RDW block (using the same header parser we use today)
  2. a field within the record
  3. figured out from the record itself by parsing the OCCURS dependents

@yruslan
Copy link
Collaborator

yruslan commented Jan 20, 2020

Yes, it seems like a very good idea to have a special custom record header parsers for a situation when variable_size_occurs=true and no RDWs, and no record length field.

  • That parser could fetch the input stream up to a first occurs size field, calculate the next chunk, fetch data up to the next occurs size field, and so forth.
  • And finally, it could fetch the rest of the data up to the end of the record.
  • Since record header parsers are used by the index builder automatically, no other logic needs to be updated.

In fact, something like this could even simplify the code as there is no need for a distinction between fixed- and variable-length records; length would be determined by:

  1. existence of an RDW block (using the same header parser we use today)
  2. a field within the record [size]
  3. figured out from the record itself by parsing the OCCURS dependents

The 1-2 is how it already works. Yes, we may not distinguish between fixed record length files and variable record length files. But the simplest fixed record length files use Spark's binaryRecords() API and are still more efficient than any of the instances where record sizes are variable.

@tr11
Copy link
Collaborator Author

tr11 commented Jan 20, 2020 via email

@yruslan yruslan added the accepted Accepted for implementation label Jan 21, 2020
@yruslan
Copy link
Collaborator

yruslan commented Jan 21, 2020

Maybe we could change the logic of computing BinaryProperties to also keep
track of offsets of dependedOn fields.

Yes, might be helpful.
I'll probably go with an underlying custom record parser implementation. The parser will be used in case a file without RDWs or a record length field. The parser will be provided with offsets to dependent fields and types and corresponding OCCURS sizes.

I'll look into this issue in a week or two. There are some high priority things I need to finish.

What I meant was that we wouldn't have to tell cobrix explicitly whether
the record is fixed or not.

Yes, this is exactly how it works now. The .option("is_record_sequence", "true") specifies that the file has RDWs. The name is misleading. Maybe we should add a synonym, say, has_rdw (or similar). Funnily enough, this option already has a synonym is_xcom since the headers added to the beginning of each record were thought to be added by XCOM. 😄

@yruslan
Copy link
Collaborator

yruslan commented Feb 6, 2020

  • Is it possible to have several variable-length arrays in one record in your use case?
  • Is it possible to have a variable-length array inside a variable-length array in your use case?

Trying to figure out if support for simple cases is sufficient or we need a more generic implementasion.

@tr11
Copy link
Collaborator Author

tr11 commented Feb 6, 2020

Yeah, I have a few files that have multiple variable occurs, with some of those nested!

What I implemented in the past that I can do here too is a recursive size method to give the size of group given a record. The output would be something akin to Option[int, struct of offsets and datatypes] that could be applied to a record. If there are no variable occurs then it's just an int and we can skip the record.

@yruslan
Copy link
Collaborator

yruslan commented Feb 6, 2020

The way the custom record header interface is implemented it has the following limitations:

  • The number of bytes to determine the size of the record should be known in advance and it cannot be bigger than the size of the current record. For multiple nested variable-size arrays this size cannot generally be determined.
  • The buffer used is a forward-only buffer. If a record is read past its end the current position in the buffer cannot be moved back.
    Give the above limitations a new interface for custom record header parsers need to be implemented in order to support this use case.

I'm wondering why such complicated files do not contain RDWs or a record length field?
Is it a limitation of a tool that transfers files from mainframes?

@yruslan
Copy link
Collaborator

yruslan commented Feb 6, 2020

In order to implement this use case scalably and as generic as it is, we need a sparse index builder that takes an AST and an instance of SimpleStream and returns a sparse index in form of ArrayBuffer[SparseIndexEntry].
Basically IndexGenerator.sparseIndexGenerator should be rewritten to account for this particular case.

A very helpful thing from you side would be a unit test that uses a copybook with several variable-size arrays, and a data file with various array sizes, including 0, so that the feature can be easily tested once implemented.

@tr11
Copy link
Collaborator Author

tr11 commented Feb 6, 2020

I'm wondering why such complicated files do not contain RDWs or a record length field?
Is it a limitation of a tool that transfers files from mainframes?

These are files that we receive from external vendors and have no control over. They structure the files such that there's a 10-byte header that defines the segment. From there we can determine the max size of the record, but we will only know as we read it.

@tr11
Copy link
Collaborator Author

tr11 commented Feb 6, 2020

In order to implement this use case scalably and as generic as it is, we need a sparse index builder that takes an AST and an instance of SimpleStream and returns a sparse index in form of ArrayBuffer[SparseIndexEntry].
Basically IndexGenerator.sparseIndexGenerator should be rewritten to account for this particular case.

To make sure, we're doing

@tr11 tr11 closed this as completed Feb 6, 2020
@tr11 tr11 reopened this Feb 6, 2020
@yruslan
Copy link
Collaborator

yruslan commented Feb 7, 2020

Makes sense. Thanks!

I have now a sketch of a solution in mind. I will proceed by creating another interface and call it something like StreamRecordExtractor that will convert a SimpleStream + AST into basically an iterator of Array[Byte]. This interface can be used in the index builder and variable-length record readers.

In the future, this solution will provide an easy way to extend it with the support of 'custom record extractors'. I was thinking that if implemented properly this 'custom extractor' interface might be easier to use for custom record formats than 'custom record header parsers'.

@tr11
Copy link
Collaborator Author

tr11 commented Feb 7, 2020

If I understand correctly, this is exactly what I did for my Python parser + decoder. In fact, I have something similar to that interface and the following different extractors since the files we receive have a few different formats (!!):

  • FixedLenExtractor (just returns the same value all the time)
  • TrimmedFixLenExtractor (to accommodate a weird "fixed-len" format in which the last record is stripped of whitespaces and records are separated by a termination character)
  • VariableLenExtractor, which goes through OCCURS clauses and is coupled with a SegmentExtractor that chooses the appropriate segment based on the following variants:
    • LeadingBytesSegmentExtractor, determines the segment from some function that reads the first few bytes of the record
    • CopybookSegmentExtractor, which is a subclass of LeadingBytesSegmentExtractor but applies a copybook to the first few bytes and extracts a certain field

Even though I don't have files with RDW headers, I implemented it anyway as a particular case of the LeadingBytesSegmentExtractor.

In particular, I only have one type of reader, which calls the length extractor before execution. If the file happens to be fixed len, the extractor code skips pre-parsing the file and just returns a range of offsets for each record. Otherwise, the file gets pre-parsed to create the offsets.
The other benefit is that the same approach can be used for streaming with the caveat that the whole record needs to be read to determine the size until we can backtrack and decode it. I hope this makes sense.

@yruslan
Copy link
Collaborator

yruslan commented Feb 8, 2020

Yes, this makes perfect sense. Thank you!

  • TrimmedFixLenExtractor is something might be nice to support in the future. It seems such files are quite common.
  • A stream class with backtracking is actually already implemented in Cobrix (SimpleMemoryStream) for experiments we did on very peculiar files. In the future, this class may be simplified and cleaned so it can be used when backtracking is the easiest approach.

For now, I'll focus on the specific record extractor for fixed-record-length files with variable size OCCURS.

I'm just wondering. You mentioned the other day that you don't use Spark, but just the Cobol parser to achieve the mainframe file conversion. Do the feature requests you created mean that you are planning to use Spark after all?

@tr11
Copy link
Collaborator Author

tr11 commented Feb 8, 2020

I'm just wondering. You mentioned the other day that you don't use Spark, but just the Cobol parser to achieve the mainframe file conversion. Do the feature requests you created mean that you are planning to use Spark after all?

I'm using cobrix with pyspark for all the files I can use it on, which at the moment are the fixed len ebcdic files. My current setup for the other files is suboptimal -- I use my custom parser/decoder in Python to output json files and then load them into Spark dataframes.

I can help with whatever you need, I'm quite motivated to transition everything to cobrix.

yruslan added a commit that referenced this issue Feb 8, 2020
@yruslan
Copy link
Collaborator

yruslan commented Feb 8, 2020

The variable size occurs feature shouldn't take long.
If you have an example similar to the one in the description, but that has several variable size occurs, with a nested var size occur field, that would definitely speed things up. But it is completely up to you. I can generate such an example as well.

@tr11
Copy link
Collaborator Author

tr11 commented Feb 8, 2020

I actually have a test for my decoder that tests that format. I'll implement it on a separate branch here (that will of course fail).

It would be useful having a list of tasks that need to be completed (such as adding tests, adding a certain class or method, etc). I'll gladly take whatever task I can help with.

@yruslan
Copy link
Collaborator

yruslan commented Feb 8, 2020

I actually have a test for my decoder that tests that format. I'll implement it on a separate branch here (that will of course fail).

Great, Thanks!

It would be useful having a list of tasks that need to be completed (such as adding tests, adding a certain class or method, etc). I'll gladly take whatever task I can help with.

Thanks! I'll keep that in mind. For now, it seems that there are no big enough tasks to develop in parallel.

yruslan added a commit that referenced this issue Feb 9, 2020
yruslan added a commit that referenced this issue Feb 10, 2020
yruslan added a commit that referenced this issue Feb 10, 2020
@tr11
Copy link
Collaborator Author

tr11 commented Feb 12, 2020

@yruslan
Copy link
Collaborator

yruslan commented Feb 13, 2020

Cool, thanks a lot! I hope to finish the implementation soon.

@yruslan
Copy link
Collaborator

yruslan commented Feb 18, 2020

While doing this issue I've found a bug in the support of nesting OCCURS.

The support for variable OCCURS for fixed record length files is deployed as a snapshot:
spark-cobol_2.11 version 2.0.4-SNAPSHOT.

I plan to do more testing on this before releasing it.

You can check if it works for you if you want.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Accepted for implementation bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants