8000 feat: get list streams by MicBun · Pull Request #875 · trufnetwork/node · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: get list streams #875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions internal/migrations/001-common-actions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1089,4 +1089,44 @@ CREATE OR REPLACE ACTION filter_streams_by_existence(
for $i in 1..array_length($filtered_dp) {
RETURN NEXT $filtered_dp[$i], $filtered_sid[$i];
}
};

CREATE OR REPLACE ACTION list_streams(
$data_provider TEXT,
$limit INT,
$offset INT,
$order_by TEXT
) PUBLIC view returns table(
data_provider TEXT,
stream_id TEXT,
stream_type TEXT,
created_at INT8
) {
if $limit > 5000 {
ERROR('Limit exceeds maximum allowed value of 5000');
}
if $limit IS NULL OR $limit = 0 {
$limit := 5000;
}
if $offset IS NULL OR $offset = 0 {
$offset := 0;
}
if $order_by IS NULL OR $order_by = '' {
$order_by := 'created_at DESC';
}

RETURN SELECT data_provider,
stream_id,
stream_type,
created_at
FROM streams
WHERE $data_provider IS NULL OR $data_provider = '' OR LOWER(data_provider) = LOWER($data_provider)
ORDER BY
CASE WHEN $order_by = 'created_at DESC' THEN created_at END DESC,
CASE WHEN $order_by = 'created_at ASC' THEN created_at END ASC,
CASE WHEN $order_by = 'stream_id ASC' THEN stream_id END ASC,
CASE WHEN $order_by = 'stream_id DESC' THEN stream_id END DESC,
CASE WHEN $order_by = 'stream_type ASC' THEN stream_type END ASC,
CASE WHEN $order_by = 'stream_type DESC' THEN stream_type END DESC
LIMIT $limit OFFSET $offset;
};
37 changes: 37 additions & 0 deletions tests/streams/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestQueryStream(t *testing.T) {
WithQueryTestSetup(testQUERY07_AdditionalInsertWillFetchLatestRecord(t)),
WithComposedQueryTestSetup(testAGGR03_ComposedStreamWithWeights(t)),
WithQueryTestSetup(testBatchInsertAndQueryRecord(t)),
WithQueryTestSetup(testListStreams(t)),
},
}, testutils.GetTestOptions())
}
Expand Down Expand Up @@ -746,3 +747,39 @@ func testBatchInsertAndQueryRecord(t *testing.T) func(ctx context.Context, platf
return nil
}
}

func testListStreams(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
dataProviderStr := fmt.Sprintf("0x%x", platform.Deployer)
result, err := procedure.ListStreams(ctx, procedure.ListStreamsInput{
Platform: platform,
Height: 0,
DataProvider: dataProviderStr,
Limit: 10,
Offset: 0,
OrderBy: "created_at DESC",
})
if err != nil {
return errors.Wrap(err, "error listing streams")
}

expected := fmt.Sprintf(`
| data_provider | stream_id | stream_type | created_at |
|---------------|-----------|-------------|------------|
| %s | %s | primitive | 1 |
| %s | %s | composed | 1 |
| %s | %s | primitive | 1 |
`,
Comment on lines +766 to +772
Copy link
Preview
Copilot AI Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi-line raw string literal used for the expected markdown table is indented, which may introduce unintended leading whitespace that could cause the table assertion to fail; consider removing or trimming the indentation.

Suggested change
expected := fmt.Sprintf(`
| data_provider | stream_id | stream_type | created_at |
|---------------|-----------|-------------|------------|
| %s | %s | primitive | 1 |
| %s | %s | composed | 1 |
| %s | %s | primitive | 1 |
`,
expected := strings.TrimSpace(fmt.Sprintf(`
| data_provider | stream_id | stream_type | created_at |
|---------------|-----------|-------------|------------|
| %s | %s | primitive | 1 |
| %s | %s | composed | 1 |
| %s | %s | primitive | 1 |
`,

Copilot uses AI. Check for mistakes.

dataProviderStr, primitiveChildStreamId.String(),
dataProviderStr, composedStreamId.String(),
dataProviderStr, primitiveStreamId.String(),
)

// Validate the result
table.AssertResultRowsEqualMarkdownTable(t, table.AssertResultRowsEqualMarkdownTableInput{
Actual: result,
Expected: expected,
})
return nil
}
}
53 changes: 53 additions & 0 deletions tests/streams/utils/procedure/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,56 @@ func DisableTaxonomy(ctx context.Context, input DisableTaxonomyInput) error {

return nil
}

type ListStreamsInput struct {
Platform *kwilTesting.Platform
Height int64
DataProvider string
Limit int
Offset int
OrderBy string
}

func ListStreams(ctx context.Context, input ListStreamsInput) ([]ResultRow, error) {
deployer, err := util.NewEthereumAddressFromBytes(input.Platform.Deployer)
if err != nil {
return nil, errors.Wrap(err, "error in ListStreams")
}

txContext := &common.TxContext{
Ctx: ctx,
BlockContext: &common.BlockContext{
Height: input.Height,
},
TxID: input.Platform.Txid(),
Signer: input.Platform.Deployer,
Caller: deployer.Address(),
}

engineContext := &common.EngineContext{
TxContext: txContext,
}

var resultRows [][]any
r, err := input.Platform.Engine.Call(engineContext, input.Platform.DB, "", "list_streams", []any{
input.DataProvider,
input.Limit,
input.Offset,
input.OrderBy,
}, func(row *common.Row) error {
values := make([]any, len(row.Values))
for i, v := range row.Values {
values[i] = v
}
resultRows = append(resultRows, values)
return nil
})
if err != nil {
return nil, errors.Wrap(err, "error in ListStreams")
}
if r.Error != nil {
return nil, errors.Wrap(r.Error, "error in ListStreams")
}

return processResultRows(resultRows)
}
Loading
0