-
-
Save slvrtrn/392c6dd5371d84651a3c2d13ed32946a to your computer and use it in GitHub Desktop.
sockets take #1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ts-node --transpile-only node/socket.ts | |
inserting test data 0 | |
inserting test data 1 | |
inserting test data 2 | |
inserting test data 3 | |
inserting test data 4 | |
inserting test data 5 | |
inserting test data 6 | |
inserting test data 7 | |
inserting test data 8 | |
inserting test data 9 | |
streaming in test data | |
streaming in test data | |
inserting test data 0 | |
inserting test data 1 | |
inserting test data 2 | |
inserting test data 3 | |
inserting test data 4 | |
inserting test data 5 | |
inserting test data 6 | |
inserting test data 7 | |
inserting test data 8 | |
inserting test data 9 | |
inserting test data 0 | |
(node:428789) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added to [Readable]. Use emitter.setMaxListeners() to increase limit | |
(Use `node --trace-warnings ...` to show where the warning was created) | |
inserting test data 1 | |
inserting test data 2 | |
inserting test data 3 | |
inserting test data 4 | |
inserting test data 5 | |
inserting test data 6 | |
inserting test data 7 | |
inserting test data 8 | |
inserting test data 9 | |
count rows in test_table [ { 'count()': '30' } ] | |
clickhouse cleanup | |
clickhouse cleanup done | |
Error: socket hang up | |
at connResetException (node:internal/errors:717:14) | |
at Socket.socketCloseListener (node:_http_client:475:25) | |
at Socket.emit (node:events:525:35) | |
at Socket.emit (node:domain:489:12) | |
at TCP.<anonymous> (node:net:322:12) { | |
code: 'ECONNRESET' | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { createClient, InsertResult } from '@clickhouse/client' | |
import process from 'node:process' | |
import Stream from 'node:stream' | |
export class ClickHouse { | |
#insertPromise: Promise<InsertResult> | undefined | |
#client | |
stream = new Stream.Readable({ | |
objectMode: true, | |
read() {}, | |
}) | |
constructor() { | |
this.#client = createClient({ | |
host: process.env.CLICKHOUSE_HOST, | |
username: 'default', | |
password: process.env.CLICKHOUSE_SECRET, | |
keep_alive: { | |
enabled: false, | |
// socket_ttl: 2500, | |
// retry_on_expired_socket: true, | |
}, | |
}) | |
} | |
async init() { | |
await this.#client | |
.command({ | |
query: ` | |
CREATE OR REPLACE TABLE test_table | |
( | |
id UInt32, | |
name String, | |
time DateTime64 | |
) ENGINE = Memory | |
`, | |
}) | |
.catch((error) => console.error('⚠️ clickhouse init error:', error)) | |
} | |
async createPromise(table: string) { | |
this.#insertPromise = this.#client | |
.insert({ | |
table, | |
values: this.stream, | |
format: 'JSONEachRow', | |
}) | |
.catch(async (error) => { | |
console.error(error) | |
process.exit(255) | |
}) | |
} | |
async streamTestData() { | |
console.log('streaming in test data') | |
for (let index = 0; index < 1000; index++) { | |
this.stream.push({ | |
id: index, | |
name: 'test', | |
time: Date.now(), | |
}) | |
} | |
} | |
// insert data using INSERT (not stream) with 5 sec sleep in between | |
async insertTestData() { | |
for (let index = 0; index < 10; index++) { | |
console.log('inserting test data', index) | |
await this.#client | |
.insert({ | |
table: 'test_table', | |
values: [ | |
{ | |
id: index, | |
time: Date.now(), | |
name: 'test', | |
}, | |
], | |
format: 'JSONEachRow', | |
}) | |
.catch(console.error) | |
await new Promise((resolve) => setTimeout(resolve, 5_000)) | |
} | |
} | |
async closeStreams() { | |
// count rows in test_table | |
const { data } = await ( | |
await this.#client.query({ query: 'SELECT count(*) FROM test_table' }) | |
).json<{ data: unknown }>() | |
console.log('count rows in test_table', data) | |
// close stream | |
console.log('clickhouse cleanup') | |
this.stream.push(null) | |
// stream.destroy() | |
// when the stream is closed, the insert stream can be awaited | |
if (this.#insertPromise !== undefined) { | |
await this.#insertPromise | |
} | |
await this.#client.close() | |
console.log('clickhouse cleanup done') | |
} | |
} | |
void (async () => { | |
// this passes | |
const test1 = async () => { | |
// await clickhouse.createPromise('test_table') | |
// await clickhouse.streamTestData() | |
await clickhouse.insertTestData() | |
} | |
// this passes | |
const test2 = async () => { | |
await clickhouse.createPromise('test_table') | |
await clickhouse.streamTestData() | |
// await clickhouse.insertTestData() | |
} | |
// this fails with SOCKET_TIMEOUT | |
const test3 = async () => { | |
await clickhouse.createPromise('test_table') | |
await clickhouse.streamTestData() | |
await clickhouse.insertTestData() | |
} | |
// this fails with Error: socket hang up ECONNRESET | |
const test4 = async () => { | |
await clickhouse.createPromise('test_table') | |
// await clickhouse.streamTestData() | |
await clickhouse.insertTestData() | |
} | |
const clickhouse = new ClickHouse() | |
await clickhouse.init() | |
await test1() | |
await test2() | |
await test3() | |
await test4() | |
await clickhouse.closeStreams() | |
})() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment