Skip to content

Instantly share code, notes, and snippets.

@slvrtrn

slvrtrn/out Secret

Created November 1, 2023 22:19
Show Gist options
  • Save slvrtrn/392c6dd5371d84651a3c2d13ed32946a to your computer and use it in GitHub Desktop.
Save slvrtrn/392c6dd5371d84651a3c2d13ed32946a to your computer and use it in GitHub Desktop.
sockets take #1
ts-node --transpile-only node/socket.ts
inserting test data 0
inserting test data 1
inserting test data 2
inserting test data 3
inserting test data 4
inserting test data 5
inserting test data 6
inserting test data 7
inserting test data 8
inserting test data 9
streaming in test data
streaming in test data
inserting test data 0
inserting test data 1
inserting test data 2
inserting test data 3
inserting test data 4
inserting test data 5
inserting test data 6
inserting test data 7
inserting test data 8
inserting test data 9
inserting test data 0
(node:428789) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added to [Readable]. Use emitter.setMaxListeners() to increase limit
(Use `node --trace-warnings ...` to show where the warning was created)
inserting test data 1
inserting test data 2
inserting test data 3
inserting test data 4
inserting test data 5
inserting test data 6
inserting test data 7
inserting test data 8
inserting test data 9
count rows in test_table [ { 'count()': '30' } ]
clickhouse cleanup
clickhouse cleanup done
Error: socket hang up
at connResetException (node:internal/errors:717:14)
at Socket.socketCloseListener (node:_http_client:475:25)
at Socket.emit (node:events:525:35)
at Socket.emit (node:domain:489:12)
at TCP.<anonymous> (node:net:322:12) {
code: 'ECONNRESET'
}
import { createClient, InsertResult } from '@clickhouse/client'
import process from 'node:process'
import Stream from 'node:stream'
export class ClickHouse {
#insertPromise: Promise<InsertResult> | undefined
#client
stream = new Stream.Readable({
objectMode: true,
read() {},
})
constructor() {
this.#client = createClient({
host: process.env.CLICKHOUSE_HOST,
username: 'default',
password: process.env.CLICKHOUSE_SECRET,
keep_alive: {
enabled: false,
// socket_ttl: 2500,
// retry_on_expired_socket: true,
},
})
}
async init() {
await this.#client
.command({
query: `
CREATE OR REPLACE TABLE test_table
(
id UInt32,
name String,
time DateTime64
) ENGINE = Memory
`,
})
.catch((error) => console.error('⚠️ clickhouse init error:', error))
}
async createPromise(table: string) {
this.#insertPromise = this.#client
.insert({
table,
values: this.stream,
format: 'JSONEachRow',
})
.catch(async (error) => {
console.error(error)
process.exit(255)
})
}
async streamTestData() {
console.log('streaming in test data')
for (let index = 0; index < 1000; index++) {
this.stream.push({
id: index,
name: 'test',
time: Date.now(),
})
}
}
// insert data using INSERT (not stream) with 5 sec sleep in between
async insertTestData() {
for (let index = 0; index < 10; index++) {
console.log('inserting test data', index)
await this.#client
.insert({
table: 'test_table',
values: [
{
id: index,
time: Date.now(),
name: 'test',
},
],
format: 'JSONEachRow',
})
.catch(console.error)
await new Promise((resolve) => setTimeout(resolve, 5_000))
}
}
async closeStreams() {
// count rows in test_table
const { data } = await (
await this.#client.query({ query: 'SELECT count(*) FROM test_table' })
).json<{ data: unknown }>()
console.log('count rows in test_table', data)
// close stream
console.log('clickhouse cleanup')
this.stream.push(null)
// stream.destroy()
// when the stream is closed, the insert stream can be awaited
if (this.#insertPromise !== undefined) {
await this.#insertPromise
}
await this.#client.close()
console.log('clickhouse cleanup done')
}
}
void (async () => {
// this passes
const test1 = async () => {
// await clickhouse.createPromise('test_table')
// await clickhouse.streamTestData()
await clickhouse.insertTestData()
}
// this passes
const test2 = async () => {
await clickhouse.createPromise('test_table')
await clickhouse.streamTestData()
// await clickhouse.insertTestData()
}
// this fails with SOCKET_TIMEOUT
const test3 = async () => {
await clickhouse.createPromise('test_table')
await clickhouse.streamTestData()
await clickhouse.insertTestData()
}
// this fails with Error: socket hang up ECONNRESET
const test4 = async () => {
await clickhouse.createPromise('test_table')
// await clickhouse.streamTestData()
await clickhouse.insertTestData()
}
const clickhouse = new ClickHouse()
await clickhouse.init()
await test1()
await test2()
await test3()
await test4()
await clickhouse.closeStreams()
})()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment