Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: CSV Support, S3V2Usage #47005

Merged
merged 2 commits into from
Oct 19, 2024

Conversation

johnny-schmidt
Copy link
Contributor

What

CSV. Uses the old printer with AirbyteValue/Schema directly.

Conversions aren't complete, and there are no tests. I will come back and shore all that up after I've proved out Avro and Parquet.

Bonuses

  • I add a cleaner, but didn't wire it up yet because it has dependencies
  • I figured out how to get java.io.Writer to work w/o resorting to runBlocking, so streamingUpload now accepts a block that works on Writer, allowing the apache CSVPrinter to wrap it; some callouts:
    • it requires async dispatch in an anonymous scope, so the s3client has to call start() and await the returned job (wonky, but limited to the client so the implementor still gets a clean interface)
    • Writer::close is a noop (the contract is that the writer is closed when the block exits)
    • flush throws, since there really isn't a need for it, but maybe later I'll wire it to uploadPart if there's a need

@johnny-schmidt johnny-schmidt requested a review from a team as a code owner October 18, 2024 18:56
Copy link

vercel bot commented Oct 18, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Oct 18, 2024 11:44pm

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit connectors/destination/s3-v2 labels Oct 18, 2024
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/s3v2/issue-9969/compression branch from 5f740f3 to ca54e41 Compare October 18, 2024 20:40
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/s3v2/issue-9969/compression branch from ca54e41 to eb1a86a Compare October 18, 2024 20:58
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/s3v2/issue-9969/csv branch 2 times, most recently from 70c1199 to d40830b Compare October 18, 2024 21:04
Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't sure about the multipart upload stuff, everything else makes sense


private fun convertInner(value: String, field: AirbyteType): AirbyteValue {
return when (field) {
is ArrayType -> ArrayValue(value.split(",").map { convertInner(it, field.items.type) })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the serialized string [1,2,3]? i.e shouldn't we also strip the square brackets (or maybe just parse it as json)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be is ArrayType -> value.deserializeToNode().elements().asSequence().map { it.toAirbyteValue(field.items.type) }.toList().let(::ArrayValue)


suspend fun start(): Job =
CoroutineScope(Dispatchers.IO).launch {
for (unit in work) {
uploadPart()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to be unit()? it doesn't seem like we ever actually invoke the function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd partially converted it to a Unit channel, since all the work items were uploadPart. Fixed.

override fun write(str: String) {
wrappingBuffer.write(str.toByteArray(Charsets.UTF_8))
if (underlyingBuffer.size() >= partSize) {
runBlocking { work.send { uploadPart() } }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I follow here - don't we need to call uploadPart() directly to avoid a race condition? (i.e. someone calls write() while uploadPart is still in-flight)

or should this be work.send { uploadPart(underlyingBuffer); underlyingBuffer = newBuffer() }?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, we need to sequence the work items. The most straightforward thing to do is just to push whatever would have been in the body of the function into the channel.

    suspend fun run(block: suspend (OutputStream) -> Unit) = coroutineScope {
        log.info {
            "Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}"
        }
        launch {
            for (item in workQueue) {
                item()
            }
            complete()
        }
        UploadStream().use {
            block(it)
        }
    }

    inner class UploadStream : OutputStream() {
        override fun close() = runBlocking {
            workQueue.send { workQueue.close() }
        }

        override fun flush() = runBlocking {
            workQueue.send { wrappingBuffer.flush() }
        }

        override fun write(b: Int) = runBlocking {
            workQueue.send {
                wrappingBuffer.write(b)
                if (underlyingBuffer.size() >= partSize) {
                    uploadPart()
                }
            }
        }

        override fun write(b: ByteArray) = runBlocking {
            workQueue.send {
                wrappingBuffer.write(b)
                if (underlyingBuffer.size() >= partSize) {
                    uploadPart()
                }
            }
        }
    }

This also simplifies the caller to

        val upload =
            S3MultipartUpload(
                client,
                response,
                ByteArrayOutputStream(),
                streamProcessor,
                uploadConfig
            )
        upload.run(block)

Also note I converted the writer into an output stream. This will be necessary for Avro.

@johnny-schmidt johnny-schmidt force-pushed the jschmidt/s3v2/issue-9969/compression branch from eb1a86a to 6050e74 Compare October 18, 2024 21:33
Base automatically changed from jschmidt/s3v2/issue-9969/compression to master October 18, 2024 21:48
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/s3v2/issue-9969/csv branch 2 times, most recently from 22081d5 to 266f11c Compare October 18, 2024 22:31
Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(copying from DM - let's add some comments about why we need the channel -> why we need to be an OutputStream, but otherwise lgtm)

@johnny-schmidt johnny-schmidt merged commit 4f534ef into master Oct 19, 2024
36 checks passed
@johnny-schmidt johnny-schmidt deleted the jschmidt/s3v2/issue-9969/csv branch October 19, 2024 00:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues CDK Connector Development Kit connectors/destination/s3-v2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants