Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover corrupted GZIP files, setup of Java-Ruby mixed environment. #249

Merged
merged 16 commits into from
Jul 19, 2022

Conversation

mashhurs
Copy link
Contributor

@mashhurs mashhurs commented Jun 21, 2022

Issue description

When using GZIP encoding option with output to AWS S3 plugin, there are cases where Logstash may be crashed. When Logstash crashed GZIP stream is left opened and no tail in the file exist. Logstash uploads corrupted file to S3 at restart but customers who download S3 file and use, they figured out the file is corrupted.
This PR aims to recover the corrupted file at restart time and upload healthy GZIP file to S3.

FYI: Requested the case where/when/how Logstash crashed, I will investigate once I get response

Acceptance Criteria

Logstash should always upload healthy GZIP/plain text files to AWS S3.

Solution explanation

Look at the note section for details: option 4 is recommended and implemented.

Testing

Unit testing

  • GzipUtilTest unit test class covers to recover, compress and decompress success and failure cases accordingly.

E2E testing

  • After pulling this change and setup local dev with Logstash-code, setup your AWS S3 bucket with file uploadable policy.
  • Use the S3 output setting, eg.:
input {
    stdin { }
}
output {
    s3 {
        region => "region"
        bucket => "logstash-mashhur-test"
        codec => "json_lines"
        canned_acl => "private"
        prefix => "test-%{+YYYY.MM.dd}"
        additional_settings => {
            "force_path_style" => true
        }
        upload_queue_size => 10
        upload_workers_count => 2
        size_file => 1024
        rotation_strategy => "size"
        encoding => gzip
        temporary_directory => "/path/to/local/temp/s3"
        validate_credentials_on_root_bucket => false
    }

    stdout {}
}
  • Run Logstash and type on the console, kill Logstash with kill -9 PID where PID can be fetched with ps -la | grep logstash
  • Re-run Logstash and see the S3 bucket if it uploaded, you may seen on console that Logstash warns recovered file.
[2022-06-22T15:38:43,062][WARN ][org.logstash.plugins.outputs.s3.GzipUtil][main] Corrupted file recovered, path:/path/of/file-recovered.txt
  • Download S3 object with aws s3 cp s3://logstash-mashhur-test/path-to-file.txt.gz /local/path/to/download/
  • Unzip file with gunzip downloaded-file.txt.gz and open the file.

Additional Notes

Discussed solutions:

  1. [Gzip recovery] Pull out the GZIP header and decompress healthy blocks till we find a tail in Plain Ruby.

    • Ruby GZIP interfaces (GzipReader, GzipWriter) do not allow to access GZIP blocks/headers byte-by-byte. They always validate the file on any read actions. - NOT FEASIBLE.
  2. Logstash uses plain text and when uploading to S3 it zips the file.

    • For this case, the size policy will not work properly. Saying that, when users set output -> s3 -> size_file users' intention is ZIP file size not ongoing plain text. Since Logstash looses accuracy, this option would be a low priority.
  3. External tool

  • Existing tools such as gz-recover will work as expected and this is an OSS. However, for the long term (eg. if security vulnerabilities found) this option would cost high since we may need to contribute or because of poor maintenance. - low priority.
  1. [Gzip recovery - RECOMMENDED, IMPLEMENTED] Pull out the GZIP header and decompress healthy blocks till we find a tail in Java.
  • Java has quite good input stream interfaces to access each file bytes. Initially, read the file and pulled out the header but GZIPInputStream interface does same whenever stream starts. That means we don't necessarily look up for header or tail operations. Instead, capturing Unexpected end of ZLIB input stream exception satisfies our condition.
  • However, this solution requires the plugin environment to be a dynamic: Ruby-Java mixed. We have plugins (input-beats, input-http) already took initiative to move to such environment. So, this change also includes Java-Ruby mixin env setups.

logstash-output-s3.gemspec Outdated Show resolved Hide resolved
@mashhurs mashhurs marked this pull request as ready for review June 22, 2022 22:28
@mashhurs mashhurs linked an issue Jun 22, 2022 that may be closed by this pull request
build.gradle Outdated Show resolved Hide resolved
Copy link
Contributor

@robbavey robbavey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass through

} catch (IOException exception) {
// raise an exception if expected exception is not end of ZLIB related.
if (CORRUPTED_FILE_ERROR.equals(exception.getMessage()) == false) {
logger.error("Error occurred while compressing the file, error={}", exception.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the javadoc, is the intent to only log here? This will continue the upload to S3 in the event of an unrecoverable zip file.

Question here is what do we want to do in the event of a corrupt/unrecoverable gzip file? Upload to S3 anyway or keep locally(maybe with a .damaged extension?), or discard?

Copy link
Contributor Author

@mashhurs mashhurs Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, so far the only case when GZIP might be corrupted is no tail case where stream is left opened when LS is crashed. In case if other cases happen, currently, LS will leave file as it is, users will be able to find it under temporary path. No any delete/upload action will be proceeded. And also, most importantly, we don't damage/crash LS and let it continue.
Let me know your thought if we can improve the situation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless, I'm missing something, I don't think LS will leave the file as it is. With this change, the error will be logged, but processing will continue as normal - uploading the corrupt file to S3, and deleting the file from local storage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File path changes when corrupted file is recovered (file_path = self.recover(file_path)) and new temp file will be returned (with create_from_existing_file). If LS does not recover the file its size is still zero (in temporary file) and it does not remote the file (source)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the file size is 0, the temporary file and the original "corrupt" file will be deleted - the process of deleting a "temporary file" also deletes the folder it is a part of.

Copy link
Contributor Author

@mashhurs mashhurs Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it is because of temp folder... yeah agree that temp folder will be removed. Thanks for letting know, reproducing such case is not easy. How do you think if we replace the size check logic with following one

// s3.rb:404L
        if temp_file.size > 0
        @logger.debug? && @logger.debug("Recovering from crash and uploading", :path => temp_file.path)
        @crash_uploader.upload_async(temp_file,
                                     :on_complete => method(:clean_temporary_file),
                                     :upload_options => upload_options)
      end
      if @encoding != "gzip" && temp_file.size != 0
        clean_temporary_file(temp_file)
      end

build.gradle Outdated Show resolved Hide resolved
.select { |file| ::File.file?(file) }
.each do |file|
temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
.select { |file_path| ::File.file?(file_path) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to figure out a way between the recovery code and this glob to ignore "recovery in progress" - currently, in the event of a failure/crash during "recovery in progress", this glob definition will pick up the original file to be recovered, plus any files in mid-recovery - ie ls.s3<>part0.txt.gz, ls.s3<>.part0-recovered.txt.gz, and ls.s3<>.part0-recovered.txt.gz. This can lead to a few different outcomes - duplicate data being uploaded to S3, potentially multiple -recovered.txt and -recovered.txt.gz extensions, and, in the event one of these files is empty, clean_temporary_file(temp_file) will be called, which deletes the entire sub directory that all of these files are stored in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Dir.glob().select{} async for this case?
The possible cases would be

  • multiple LS instances are accessing to same S3 temp path but as we discussed offline the recommendation would be providing a separate path for each instance.
  • unrecoverable zip files which always stay

So, I have added a logic where figures out the under recovery process files (under_recovery_files) and skips the file if it is in this list.

*/
public static void recover(String sourcePath, String targetPath) {
try {
GzipUtil.decompress(Paths.get(sourcePath), Paths.get(targetPath));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the recovery returns 0 bytes, then we should probably skip the rest of the recovery process, to avoid uploading an empty zip file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will covered by s.rb:L405~L413

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible for an "empty" gzip file to be created which includes the header but no content, which I don't think will be covered by the above case, and will create an empty recovery file and upload a gzip version.

(I could make this happen with input { generator { count => 250 }})

Copy link
Contributor Author

@mashhurs mashhurs Jul 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!
Technically, when receiving input gzip file will be created, that means header will be written initial gzip creation. I am not sure its possibility in our scenario but manually confirmed/tested possible empty zip cases and updated the PR.

@mashhurs mashhurs requested a review from robbavey June 29, 2022 01:27
@mashhurs mashhurs requested a review from robbavey July 3, 2022 16:42
@mashhurs mashhurs requested a review from robbavey July 9, 2022 15:26
lib/logstash/outputs/s3/temporary_file.rb Show resolved Hide resolved
:upload_options => upload_options)
end
# do not remove if Logstash tries to recover but fails
if @encoding != GZIP_ENCODING && temp_file.size != 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this logic is correct for uploading non-gzip based recovery - I think you may end up cleaning the temporary here, which may be before upload_async is complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be worthwhile to see some tests here - what happens with:

  • standard plain text recovery
  • standard gzip recovery
  • empty plain text recovery
  • empty gzip recovery

Copy link
Contributor Author

@mashhurs mashhurs Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Wooh, no idea why I removed else case, thanks for the notice that would be a crash!

  2. Test cases

It seems default (none or plain txt) encoding cases are covered in the integration test, restore_from_crash_spec.rb and I have added the cases where Logstash restores the file, uploads and removes locally.

  • Run integration test, as we synced offline, make sure to set the AWS ENV params including AWS_SESSION_TOKEN:
    bundle exec rspec logstash-plugins/logstash-output-s3/spec/integration/restore_from_crash_spec.rb:69 -t integration:true
  • Result logs:
Finished in 1 minute 22.39 seconds (files took 9.97 seconds to load)
3 examples, 0 failures

@mashhurs mashhurs requested a review from robbavey July 12, 2022 22:50
Copy link
Contributor

@robbavey robbavey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Nice job getting this through!

@mashhurs mashhurs merged commit 61208c4 into logstash-plugins:main Jul 19, 2022
@mashhurs mashhurs changed the title Initial setup of Java-Ruby mixed environment, Gzip recover Java class… Recover corrupted GZIP files, setup of Java-Ruby mixed environment. Dec 27, 2022
@xiaomaisuii
Copy link

Hello, I saw that the fix code #249 has been merged into the logstash-plugins/logstash-integration-aws repository. Could you let me know when this fix will be merged into the logstash-plugins/logstash-output-s3 repository, or if it has already been fixed in a particular version of logstash?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Corrupted gzip file on abnormal shutdown
5 participants