feat: gateway: initial dev

This commit is contained in:
Lucas Colombo 2024-03-28 13:38:57 -03:00
parent 4e95609367
commit 1a9e51d830
Signed by: lucas
GPG Key ID: EF34786CFEFFAE35
31 changed files with 4267 additions and 56 deletions

3
.gitignore vendored
View File

@ -1,3 +1,6 @@
/target
.data/*.log
.task
/.env
/rustler.db*

49
.vscode/settings.json vendored
View File

@ -1,25 +1,38 @@
{
"files.exclude": {
// config
"Taskfile.yaml": true,
".cocorc": true,
".task": true,
".cargo": true,
// "Cargo.toml": true,
// // 🧩 modules
// "migration": true,
// // "entities": true,
// 📦
"Cargo.lock": true,
"target": true,
// //
// ".env": true,
// "Taskfile.yaml": true,
// ".cocorc": true,
// ".task": true,
// ".cargo": true,
// ".github": true,
// "rustfmt.toml": true,
// // "**/**/Cargo.toml": true,
// 📝 readmes
"**/**/README.md": true,
"LICENSE": true,
// // 📦
// "Cargo.lock": true,
// "target": true,
// 🗑
"check_size.py": true,
"*.code-workspace": true,
".gitignore": true,
".vscode": true,
".git": true,
// // 📝 readmes
// "**/**/README.md": true,
// "LICENSE": true,
// // 🗑
// "check_size.py": true,
// "*.code-workspace": true,
// // ".gitignore": true,
// ".vscode": true,
// ".git": true,
// // 🗄 database
// "rustler.db": true,
// "rustler.db-shm": true,
// "rustler.db-wal": true,
// "rustler.db.bkp": true,
}
}

3595
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@ version = "0.0.0-alpha.0"
edition = "2021"
description = "🤠 » single-action data extractor"
authors = ["Lucas Colombo <lucas@lucode.ar>"]
publish = false
[profile.release]
strip = true
@ -16,10 +17,19 @@ overflow-checks = false
debug = 0
debug-assertions = false
[dependencies]
eyre = { version = "0.6.12", default-features = false, features = ["auto-install"] }
indoc = "2.0.4"
[[bin]]
name = "rustler"
path = "src/main.rs"
path = "app/main.rs"
[workspace]
members = [".", "migration", "entities", "grpc"]
[dependencies]
# internal
entities = { path = "entities" }
grpc = { path = "grpc" }
# external
eyre = { version = "0.6.12", default-features = false, features = ["auto-install"] }
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] }
dotenvy = "0.15.7"

21
LICENSE
View File

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2023 Lucas Colombo <lucas@lucode.ar>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -7,3 +7,26 @@
<p align="center">
𝐫𝐮𝐬𝐭𝐥𝐞𝐫 is a web scraping service that scrapes several stock market providers for stock pricing data. It is built using the <code>Rust</code> programming language.
</p>
<br>
<br>
<br>
## Framweork
- [Tonic (grpc)](https://docs.rs/tonic/latest/tonic/index.html)
- websockets:
- [tokio-tungstenite](https://docs.rs/tokio-tungstenite/latest/tokio_tungstenite/)
- [fastwebsockets](https://crates.io/crates/fastwebsockets)
- [embedded-websocket](https://crates.io/crates/embedded-websocket) - bajo nivel - small
- [web-socket](https://crates.io/crates/web-socket) - supuestamente el mas rapido
- Rx Rust:
- [rxrust](https://crates.io/crates/rxrust)
- [another-rxrust](https://crates.io/crates/another-rxrust) parece un poco mejor
## Commands
```bash
# run migrations
task db:migrate
```

View File

@ -3,6 +3,16 @@
version: '3'
tasks:
db:up:
desc: 🗃️ run database migrations (up)
cmds:
- sea migrate up
db:down:
desc: 🗃️ run database migrations (down)
cmds:
- sea migrate down
run:watch:
desc: 🚀 watch rustler
cmds:

31
app/main.rs Normal file
View File

@ -0,0 +1,31 @@
use {
dotenvy::dotenv,
entities::sea_orm::{ConnectionTrait, Database, DbBackend, Statement},
eyre::Result,
std::sync::{Arc, Mutex},
};
// TODO: here we will trigger the start of both the grpc server and the websocket gateway
// look at: https://github.com/hyperium/tonic/discussions/740
#[tokio::main]
async fn main() -> Result<()> {
dotenv()?;
let db_conn_str = std::env::var("DATABASE_URL")?;
// let conn = Arc::new();
let conn = Arc::new(Database::connect(&db_conn_str).await?);
conn.query_one(Statement::from_string(
DbBackend::Sqlite,
r#"
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
"#,
))
.await?;
grpc::server::start(conn.clone()).await?;
Ok(())
}

View File

@ -52,5 +52,9 @@ for key, exe in files.items():
sizefile.parent.mkdir(parents=True, exist_ok=True)
print(
f'{head(key)}: {fmt(diff_str)} (prev: {old_size}kb, now: {fmt(f"{str(new_size)}kb")})'
)
with open(sizefile, 'w') as f:
f.write(f'{new_size}')

13
entities/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "entities"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
name = "entities"
path = "src/lib.rs"
[dependencies]
sea-orm = { version = "0.12.15", features = ["runtime-tokio-native-tls", "sqlx-sqlite", "macros"] }
eyre = { version = "0.6.12", default-features = false }

4
entities/src/lib.rs Normal file
View File

@ -0,0 +1,4 @@
pub mod market;
pub mod ticker;
pub use sea_orm;

View File

@ -0,0 +1,34 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15
mod svc;
use sea_orm::entity::prelude::*;
pub use svc::Service;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "market")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,
pub short_name: String,
pub full_name: String,
pub opens_from: i32,
pub opens_till: i32,
pub open_time: String,
pub close_time: String,
pub pre_market_offset: i32,
pub post_market_offset: i32,
pub time_zone_offset: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::ticker::Entity")]
Ticker,
}
impl Related<super::ticker::Entity> for Entity {
fn to() -> RelationDef {
Relation::Ticker.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -0,0 +1,69 @@
use {
super::{Entity as MarketEntity, Model as MarketModel},
eyre::Result,
sea_orm::{DatabaseConnection, EntityTrait, IntoActiveModel},
std::{
sync::{Arc, Mutex},
time::Instant,
},
};
pub struct Service {
conn: Arc<DatabaseConnection>,
}
impl Service {
pub async fn new(conn: Arc<DatabaseConnection>) -> Self {
Self { conn }
}
pub async fn get_all(&self) -> Result<Vec<MarketModel>> {
// let conn = self.conn.unwrap();
let markets = MarketEntity::find().all(&*self.conn).await?;
Ok(markets)
}
pub async fn create(
&self,
id: &str,
short_name: &str,
full_name: &str,
opens_from: i32,
opens_till: i32,
open_time: &str,
close_time: &str,
pre_market_offset: i32,
post_market_offset: i32,
time_zone_offset: i32,
) -> Result<MarketModel> {
let mut start = Instant::now();
let market = MarketModel {
id: id.to_string(),
short_name: short_name.to_string(),
full_name: full_name.to_string(),
opens_from,
opens_till,
open_time: open_time.to_string(),
close_time: close_time.to_string(),
pre_market_offset,
post_market_offset,
time_zone_offset,
};
println!("create market model took {:?}", start.elapsed());
start = Instant::now();
MarketEntity::insert(market.clone().into_active_model()).exec(&*self.conn).await?;
println!("insert market model took {:?}", start.elapsed());
Ok(market)
}
pub async fn create_from_model(&self, market: MarketModel) -> Result<MarketModel> {
let start = Instant::now();
MarketEntity::insert(market.clone().into_active_model()).exec(&*self.conn).await?;
println!("insert market model took {:?}", start.elapsed());
Ok(market)
}
}

32
entities/src/ticker.rs Normal file
View File

@ -0,0 +1,32 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "ticker")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,
pub symbol: String,
pub market_id: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::market::Entity",
from = "Column::MarketId",
to = "super::market::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Market,
}
impl Related<super::market::Entity> for Entity {
fn to() -> RelationDef {
Relation::Market.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

14
gateway/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "gateway"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
name = "gateway"
path = "src/lib.rs"
[dependencies]
entities = { path = "../entities" }
eyre = { version = "0.6.12", default-features = false }

0
gateway/src/lib.rs Normal file
View File

19
grpc/Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "grpc"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
name = "grpc"
path = "src/lib.rs"
[dependencies]
entities = { path = "../entities" }
eyre = { version = "0.6.12", default-features = false }
prost = "0.12.3"
tonic = "0.11.0"
[build-dependencies]
tonic-build = "0.11.0"

17
grpc/build.rs Normal file
View File

@ -0,0 +1,17 @@
fn main() {
let proto_files = vec!["./proto/rustler.proto", "./proto/market.proto"];
for proto_file in proto_files {
compile_proto(proto_file);
}
}
/// builds .proto files into `Rust` code
fn compile_proto(proto_file: &str) {
tonic_build::configure()
.build_server(true)
.compile(&[proto_file], &["."])
.unwrap_or_else(|e| panic!("protobuf compile error: {}", e));
println!("cargo:rerun-if-changed={}", proto_file);
}

27
grpc/proto/market.proto Normal file
View File

@ -0,0 +1,27 @@
syntax = "proto3";
package market;
service MarketApi {
rpc GetAll (Empty) returns (Markets) {}
rpc Create (Market) returns (Market) {}
}
message Empty { }
message Market {
string id = 1;
string short_name = 2;
string full_name = 3;
int32 opens_from = 4;
int32 opens_till = 5;
string open_time = 6;
string close_time = 7;
int32 pre_market_offset = 8;
int32 post_market_offset = 9;
int32 time_zone_offset = 10;
}
message Markets {
repeated Market markets = 1;
}

40
grpc/proto/rustler.proto Normal file
View File

@ -0,0 +1,40 @@
syntax = "proto3";
package rustler;
service RustlerApi {
rpc GetAll (Empty) returns (Rustlers) {}
rpc ManageRustler (ManageRustlerDTO) returns (Rustler) {}
}
message Empty { }
enum RustlerStatus {
connected = 0;
connecting = 1;
disconnected = 2;
}
enum RustlerControlAction {
connect = 0;
disconnect = 1;
}
message ManageRustlerDTO {
string name = 1;
RustlerControlAction action = 2;
}
message Rustler {
string name = 1;
RustlerStatus status = 2;
string nextRun = 3;
string nextStop = 4;
string lastRun = 5;
string lastStop = 6;
string lastUpdate = 7;
}
message Rustlers {
repeated Rustler rustlers = 1;
}

1
grpc/src/lib.rs Normal file
View File

@ -0,0 +1 @@
pub mod server;

94
grpc/src/server.rs Normal file
View File

@ -0,0 +1,94 @@
use {
core::time,
entities::{market, sea_orm::DatabaseConnection},
eyre::Result,
std::{env, sync::Arc, thread, time::Instant},
tonic::{transport::Server, Request, Response, Status},
};
pub mod market_mod {
tonic::include_proto!("market");
}
use market_mod::{
market_api_server::{MarketApi, MarketApiServer},
Market, Markets,
};
use self::market_mod::Empty;
impl Market {
fn into_model(self) -> market::Model {
market::Model {
id: self.id,
short_name: self.short_name,
full_name: self.full_name,
opens_from: self.opens_from,
opens_till: self.opens_till,
open_time: self.open_time,
close_time: self.close_time,
pre_market_offset: self.pre_market_offset,
post_market_offset: self.post_market_offset,
time_zone_offset: self.time_zone_offset,
}
}
fn from_model(model: market::Model) -> Self {
Self {
id: model.id,
short_name: model.short_name,
full_name: model.full_name,
opens_from: model.opens_from,
opens_till: model.opens_till,
open_time: model.open_time,
close_time: model.close_time,
pre_market_offset: model.pre_market_offset,
post_market_offset: model.post_market_offset,
time_zone_offset: model.time_zone_offset,
}
}
}
pub struct GrpcServer {
svc: market::Service,
}
#[tonic::async_trait]
impl MarketApi for GrpcServer {
async fn get_all(&self, _: Request<Empty>) -> Result<Response<Markets>, Status> {
let start = Instant::now();
if let Some(mkts) = self.svc.get_all().await.ok() {
println!("get_all took {:?}", start.elapsed());
Ok(Response::new(Markets {
markets: mkts.into_iter().map(|m| Market::from_model(m)).collect(),
}))
} else {
println!("get_all took {:?}", start.elapsed());
Err(Status::internal("Failed to get markets"))
}
}
async fn create(&self, market: Request<Market>) -> Result<Response<Market>, Status> {
let start = Instant::now();
let mkt = market.into_inner().into_model();
if let Some(m) = self.svc.create_from_model(mkt).await.ok() {
println!("create took {:?}", start.elapsed());
Ok(Response::new(Market::from_model(m)))
} else {
println!("create took {:?} on err", start.elapsed());
Err(Status::internal("Failed to create market"))
}
}
}
/// Starts the gRPC server
pub async fn start(conn: Arc<DatabaseConnection>) -> Result<()> {
let addr = "0.0.0.0:50051".parse()?;
let svc = market::Service::new(conn).await;
let server = GrpcServer { svc };
Server::builder().add_service(MarketApiServer::new(server)).serve(addr).await?;
Ok(())
}

20
migration/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "migration"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
name = "migration"
path = "src/lib.rs"
[dependencies]
async-std = { version = "1", features = ["attributes", "tokio1"] }
entities = { path = "../entities" }
[dependencies.sea-orm-migration]
version = "0.12.15"
features = [
"runtime-tokio-native-tls",
"sqlx-sqlite",
]

41
migration/README.md Normal file
View File

@ -0,0 +1,41 @@
# Running Migrator CLI
- Generate a new migration file
```sh
cargo run -- generate MIGRATION_NAME
```
- Apply all pending migrations
```sh
cargo run
```
```sh
cargo run -- up
```
- Apply first 10 pending migrations
```sh
cargo run -- up -n 10
```
- Rollback last applied migrations
```sh
cargo run -- down
```
- Rollback last 10 applied migrations
```sh
cargo run -- down -n 10
```
- Drop all tables from the database, then reapply all migrations
```sh
cargo run -- fresh
```
- Rollback all applied migrations, then reapply all migrations
```sh
cargo run -- refresh
```
- Rollback all applied migrations
```sh
cargo run -- reset
```
- Check the status of all migrations
```sh
cargo run -- status
```

21
migration/src/lib.rs Normal file
View File

@ -0,0 +1,21 @@
use sea_orm_migration::async_trait::async_trait;
pub use sea_orm_migration::prelude::*;
mod m20220101_000001_create_table_market;
mod m20240325_200049_create_table_ticker;
pub struct Migrator;
#[async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20220101_000001_create_table_market::Migration),
Box::new(m20240325_200049_create_table_ticker::Migration),
]
}
fn migration_table_name() -> sea_orm::DynIden {
Alias::new("migrations").into_iden()
}
}

View File

@ -0,0 +1,56 @@
use sea_orm_migration::{async_trait::async_trait, prelude::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Market::Table)
.if_not_exists()
.col(ColumnDef::new(Market::Id).string().not_null().primary_key())
.col(ColumnDef::new(Market::ShortName).string().not_null())
.col(ColumnDef::new(Market::FullName).string().not_null())
.col(ColumnDef::new(Market::OpensFrom).integer().not_null())
.col(ColumnDef::new(Market::OpensTill).integer().not_null())
.col(ColumnDef::new(Market::OpenTime).string().not_null())
.col(ColumnDef::new(Market::CloseTime).string().not_null())
.col(ColumnDef::new(Market::PreMarketOffset).integer().not_null())
.col(ColumnDef::new(Market::PostMarketOffset).integer().not_null())
.col(ColumnDef::new(Market::TimeZoneOffset).integer().not_null())
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.drop_table(Table::drop().table(Market::Table).to_owned()).await
}
}
#[derive(DeriveIden)]
enum Market {
Table,
Id,
/// Short name of the market (e.g. "NYSE")
ShortName,
/// Full name of the market (e.g. "New York Stock Exchange")
FullName,
/// Day of the week the market opens (using UTC 0-6, where 0 is Sunday)
OpensFrom,
/// Day of the week the market closes (using UTC 0-6, where 0 is Sunday)
OpensTill,
/// Time the market opens (string in HH:MM format)
OpenTime,
/// Time the market closes (string in HH:MM format)
CloseTime,
/// Pre-market offset (in hours)
PreMarketOffset,
/// Post-market offset (in hours)
PostMarketOffset,
/// Timezone offset (UTC offset for the opening and closing times in hours)
TimeZoneOffset,
}

View File

@ -0,0 +1,49 @@
use sea_orm_migration::{async_trait::async_trait, prelude::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Ticker::Table)
.if_not_exists()
.col(ColumnDef::new(Ticker::Id).string().not_null().primary_key())
.col(ColumnDef::new(Ticker::Symbol).string().not_null())
.col(ColumnDef::new(Ticker::MarketId).string().not_null())
.foreign_key(
ForeignKey::create()
.name("fk_ticker_market_id")
.from(Ticker::Table, Ticker::MarketId)
.to(Market::Table, Market::Id)
.on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.drop_table(Table::drop().table(Ticker::Table).to_owned()).await
}
}
#[derive(DeriveIden)]
enum Market {
Table,
Id,
}
#[derive(DeriveIden)]
enum Ticker {
Table,
Id,
/// Ticker symbol (e.g. "GOOGL")
Symbol,
/// Market ID
MarketId,
}

6
migration/src/main.rs Normal file
View File

@ -0,0 +1,6 @@
use sea_orm_migration::prelude::*;
#[async_std::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
}

0
rustlers/Cargo.toml Normal file
View File

View File

@ -1,7 +0,0 @@
use eyre::Result;
fn main() -> Result<()> {
println!("Hello, rustler!");
Ok(())
}

View File

@ -15,6 +15,7 @@
},
"lucodear-icons.files.associations": {
}
},
"rust-analyzer.checkOnSave": true
}
}