refactor (backend): port add-note-to-antenna to backend-rs

I hit this bug: https://github.com/napi-rs/napi-rs/issues/2060
This commit is contained in:
naskya 2024-04-20 06:54:06 +09:00
parent f486caf244
commit ccbd6178e4
No known key found for this signature in database
GPG Key ID: 712D413B3A9FED5C
15 changed files with 247 additions and 43 deletions

31
Cargo.lock generated
View File

@ -227,6 +227,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"strum 0.26.2",
"thiserror",
"tokio",
"url",
@ -2096,6 +2097,12 @@ dependencies = [
"untrusted",
]
[[package]]
name = "rustversion"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47"
[[package]]
name = "ryu"
version = "1.0.17"
@ -2176,7 +2183,7 @@ dependencies = [
"serde",
"serde_json",
"sqlx",
"strum",
"strum 0.25.0",
"thiserror",
"time",
"tracing",
@ -2708,6 +2715,28 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
[[package]]
name = "strum"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29"
dependencies = [
"strum_macros",
]
[[package]]
name = "strum_macros"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946"
dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.58",
]
[[package]]
name = "subtle"
version = "2.5.0"

View File

@ -33,6 +33,7 @@ sea-orm = "0.12.15"
serde = "1.0.197"
serde_json = "1.0.115"
serde_yaml = "0.9.34"
strum = "0.26.2"
syn = "2.0.58"
thiserror = "1.0.58"
tokio = "1.37.0"

View File

@ -37,6 +37,7 @@ sea-orm = { workspace = true, features = ["sqlx-postgres", "runtime-tokio-rustls
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
strum = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
url = { workspace = true }

View File

@ -1098,6 +1098,7 @@ export interface Webhook {
latestSentAt: Date | null
latestStatus: number | null
}
export function addNoteToAntenna(antennaId: string, note: Note): void
/** Initializes Cuid2 generator. Must be called before any [create_id]. */
export function initIdGenerator(length: number, fingerprint: string): void
export function getTimestamp(id: string): number

View File

@ -310,7 +310,7 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}
const { loadEnv, loadConfig, stringToAcct, acctToString, checkWordMute, getFullApAccount, isSelfHost, isSameOrigin, extractHost, toPuny, isUnicodeEmoji, sqlLikeEscape, safeForSql, formatMilliseconds, getNoteSummary, toMastodonId, fromMastodonId, fetchMeta, metaToPugArgs, nyaify, hashPassword, verifyPassword, isOldPasswordAlgorithm, decodeReaction, countReactions, toDbReaction, AntennaSrcEnum, MutedNoteReasonEnum, NoteVisibilityEnum, NotificationTypeEnum, PageVisibilityEnum, PollNotevisibilityEnum, RelayStatusEnum, UserEmojimodpermEnum, UserProfileFfvisibilityEnum, UserProfileMutingnotificationtypesEnum, initIdGenerator, getTimestamp, genId, secureRndstr } = nativeBinding
const { loadEnv, loadConfig, stringToAcct, acctToString, checkWordMute, getFullApAccount, isSelfHost, isSameOrigin, extractHost, toPuny, isUnicodeEmoji, sqlLikeEscape, safeForSql, formatMilliseconds, getNoteSummary, toMastodonId, fromMastodonId, fetchMeta, metaToPugArgs, nyaify, hashPassword, verifyPassword, isOldPasswordAlgorithm, decodeReaction, countReactions, toDbReaction, AntennaSrcEnum, MutedNoteReasonEnum, NoteVisibilityEnum, NotificationTypeEnum, PageVisibilityEnum, PollNotevisibilityEnum, RelayStatusEnum, UserEmojimodpermEnum, UserProfileFfvisibilityEnum, UserProfileMutingnotificationtypesEnum, addNoteToAntenna, initIdGenerator, getTimestamp, genId, secureRndstr } = nativeBinding
module.exports.loadEnv = loadEnv
module.exports.loadConfig = loadConfig
@ -348,6 +348,7 @@ module.exports.RelayStatusEnum = RelayStatusEnum
module.exports.UserEmojimodpermEnum = UserEmojimodpermEnum
module.exports.UserProfileFfvisibilityEnum = UserProfileFfvisibilityEnum
module.exports.UserProfileMutingnotificationtypesEnum = UserProfileMutingnotificationtypesEnum
module.exports.addNoteToAntenna = addNoteToAntenna
module.exports.initIdGenerator = initIdGenerator
module.exports.getTimestamp = getTimestamp
module.exports.genId = genId

View File

@ -4,4 +4,5 @@ pub mod config;
pub mod database;
pub mod misc;
pub mod model;
pub mod service;
pub mod util;

View File

@ -0,0 +1,20 @@
use crate::database::{redis_conn, redis_key};
use crate::model::entity::note;
use crate::service::stream::{publish, Error, Stream};
use crate::util::id::get_timestamp;
use redis::{streams::StreamMaxlen, Commands};
#[crate::export]
pub fn add_note_to_antenna(antenna_id: &str, note: &note::Model) -> Result<(), Error> {
redis_conn()?.xadd_maxlen(
redis_key(format!("antennaTimeline:{}", antenna_id)),
StreamMaxlen::Approx(200),
format!("{}-*", get_timestamp(&note.id)),
&[("note", &note.id)],
)?;
let stream = Stream::Antenna {
id: antenna_id.to_string(),
};
publish(&stream, Some("note"), Some(serde_json::to_value(note)?))
}

View File

@ -0,0 +1,2 @@
pub mod add_note_to_antenna;
pub mod stream;

View File

@ -0,0 +1,86 @@
use crate::database::redis_conn;
use redis::{Commands, RedisError};
#[derive(strum::Display, serde::Serialize)]
pub enum Stream {
#[strum(serialize = "internal")]
Internal,
#[strum(serialize = "broadcast")]
Broadcast,
#[strum(to_string = "adminStream:{id}")]
Admin { id: String },
#[strum(to_string = "user:{id}")]
User { id: String },
#[strum(to_string = "channelStream:{id}")]
Channel { id: String },
#[strum(to_string = "noteStream:{id}")]
Note { id: String },
#[strum(serialize = "notesStream")]
Notes,
#[strum(to_string = "userListStream:{id}")]
UserList { id: String },
#[strum(to_string = "mainStream:{id}")]
Main { id: String },
#[strum(to_string = "driveStream:{id}")]
Drive { id: String },
#[strum(to_string = "antennaStream:{id}")]
Antenna { id: String },
#[strum(to_string = "messagingStream:{id}")]
Messaging { id: String },
#[strum(to_string = "messagingIndexStream:{id}")]
MessagingIndex { id: String },
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Redis error: {0}")]
RedisError(#[from] RedisError),
#[error("Json serialization error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Value error: {0}")]
ValueError(String),
}
pub fn publish(
channel: &Stream,
kind: Option<&str>,
value: Option<serde_json::Value>,
) -> Result<(), Error> {
#[derive(serde::Serialize)]
struct Message {
r#type: String,
body: Option<serde_json::Value>,
}
let message = if let Some(kind) = kind {
serde_json::to_value(Message {
r#type: kind.to_string(),
body: value,
})?
} else {
value.ok_or(Error::ValueError("Invalid streaming message".to_string()))?
};
redis_conn()?.publish(channel.to_string(), message.to_string())?;
Ok(())
}
#[cfg(test)]
mod unit_test {
use super::Stream;
use pretty_assertions::assert_eq;
#[test]
fn channel_to_string() {
assert_eq!(Stream::Internal.to_string(), "internal");
assert_eq!(Stream::Broadcast.to_string(), "broadcast");
assert_eq!(
Stream::Admin {
id: "9tb42br63g5apjcq".to_string()
}
.to_string(),
"adminStream:9tb42br63g5apjcq"
);
}
}

View File

@ -178,6 +178,7 @@
"ts-loader": "9.5.1",
"ts-node": "10.9.2",
"tsconfig-paths": "4.2.0",
"type-fest": "4.15.0",
"typescript": "5.4.5",
"webpack": "^5.91.0",
"ws": "8.16.0"

View File

@ -0,0 +1,76 @@
// https://gist.github.com/tkrotoff/a6baf96eb6b61b445a9142e5555511a0
import type { Primitive } from "type-fest";
type NullToUndefined<T> = T extends null
? undefined
: T extends Primitive | Function | Date | RegExp
? T
: T extends Array<infer U>
? Array<NullToUndefined<U>>
: T extends Map<infer K, infer V>
? Map<K, NullToUndefined<V>>
: T extends Set<infer U>
? Set<NullToUndefined<U>>
: T extends object
? { [K in keyof T]: NullToUndefined<T[K]> }
: unknown;
type UndefinedToNull<T> = T extends undefined
? null
: T extends Primitive | Function | Date | RegExp
? T
: T extends Array<infer U>
? Array<UndefinedToNull<U>>
: T extends Map<infer K, infer V>
? Map<K, UndefinedToNull<V>>
: T extends Set<infer U>
? Set<NullToUndefined<U>>
: T extends object
? { [K in keyof T]: UndefinedToNull<T[K]> }
: unknown;
function _nullToUndefined<T>(obj: T): NullToUndefined<T> {
if (obj === null) {
return undefined as any;
}
if (typeof obj === "object") {
if (obj instanceof Map) {
obj.forEach((value, key) => obj.set(key, _nullToUndefined(value)));
} else {
for (const key in obj) {
obj[key] = _nullToUndefined(obj[key]) as any;
}
}
}
return obj as any;
}
function _undefinedToNull<T>(obj: T): UndefinedToNull<T> {
if (obj === undefined) {
return null as any;
}
if (typeof obj === "object") {
if (obj instanceof Map) {
obj.forEach((value, key) => obj.set(key, _undefinedToNull(value)));
} else {
for (const key in obj) {
obj[key] = _undefinedToNull(obj[key]) as any;
}
}
}
return obj as any;
}
/**
* Recursively converts all undefined values to null.
*
* @param obj object to convert
* @returns a copy of the object with all its undefined values converted to null
*/
export function undefinedToNull<T>(obj: T) {
return _undefinedToNull(structuredClone(obj));
}

View File

@ -1,24 +0,0 @@
import type { Antenna } from "@/models/entities/antenna.js";
import type { Note } from "@/models/entities/note.js";
import { getTimestamp } from "backend-rs";
import { redisClient } from "@/db/redis.js";
import { publishAntennaStream } from "@/services/stream.js";
import type { User } from "@/models/entities/user.js";
export async function addNoteToAntenna(
antenna: Antenna,
note: Note,
_noteUser: { id: User["id"] },
) {
redisClient.xadd(
`antennaTimeline:${antenna.id}`,
"MAXLEN",
"~",
"200",
`${getTimestamp(note.id)}-*`,
"note",
note.id,
);
publishAntennaStream(antenna.id, "note", note);
}

View File

@ -44,8 +44,7 @@ import { Poll } from "@/models/entities/poll.js";
import { createNotification } from "@/services/create-notification.js";
import { isDuplicateKeyValueError } from "@/misc/is-duplicate-key-value-error.js";
import { checkHitAntenna } from "@/misc/check-hit-antenna.js";
import { checkWordMute } from "backend-rs";
import { addNoteToAntenna } from "@/services/add-note-to-antenna.js";
import { addNoteToAntenna, checkWordMute } from "backend-rs";
import { countSameRenotes } from "@/misc/count-same-renotes.js";
import { deliverToRelays, getCachedRelays } from "../relay.js";
import type { Channel } from "@/models/entities/channel.js";
@ -63,6 +62,7 @@ import { Mutex } from "redis-semaphore";
import { langmap } from "@/misc/langmap.js";
import Logger from "@/services/logger.js";
import { inspect } from "node:util";
import { undefinedToNull } from "@/prelude/undefined-to-null.js";
const logger = new Logger("create-note");
@ -399,7 +399,8 @@ export default async (
for (const antenna of await getAntennas()) {
checkHitAntenna(antenna, note, user).then((hit) => {
if (hit) {
addNoteToAntenna(antenna, note, user);
// TODO: do this more sanely
addNoteToAntenna(antenna.id, undefinedToNull(note) as Note);
}
});
}

View File

@ -4,12 +4,12 @@ import type { Note } from "@/models/entities/note.js";
import type { UserList } from "@/models/entities/user-list.js";
import type { UserGroup } from "@/models/entities/user-group.js";
import { config } from "@/config.js";
import type { Antenna } from "@/models/entities/antenna.js";
// import type { Antenna } from "@/models/entities/antenna.js";
import type { Channel } from "@/models/entities/channel.js";
import type {
StreamChannels,
AdminStreamTypes,
AntennaStreamTypes,
// AntennaStreamTypes,
BroadcastTypes,
ChannelStreamTypes,
DriveStreamTypes,
@ -134,17 +134,17 @@ class Publisher {
);
};
public publishAntennaStream = <K extends keyof AntennaStreamTypes>(
antennaId: Antenna["id"],
type: K,
value?: AntennaStreamTypes[K],
): void => {
this.publish(
`antennaStream:${antennaId}`,
type,
typeof value === "undefined" ? null : value,
);
};
// public publishAntennaStream = <K extends keyof AntennaStreamTypes>(
// antennaId: Antenna["id"],
// type: K,
// value?: AntennaStreamTypes[K],
// ): void => {
// this.publish(
// `antennaStream:${antennaId}`,
// type,
// typeof value === "undefined" ? null : value,
// );
// };
public publishMessagingStream = <K extends keyof MessagingStreamTypes>(
userId: User["id"],
@ -217,7 +217,7 @@ export const publishNoteStream = publisher.publishNoteStream;
export const publishNotesStream = publisher.publishNotesStream;
export const publishChannelStream = publisher.publishChannelStream;
export const publishUserListStream = publisher.publishUserListStream;
export const publishAntennaStream = publisher.publishAntennaStream;
// export const publishAntennaStream = publisher.publishAntennaStream;
export const publishMessagingStream = publisher.publishMessagingStream;
export const publishGroupMessagingStream =
publisher.publishGroupMessagingStream;

View File

@ -521,6 +521,9 @@ importers:
tsconfig-paths:
specifier: 4.2.0
version: 4.2.0
type-fest:
specifier: 4.15.0
version: 4.15.0
typescript:
specifier: 5.4.5
version: 5.4.5
@ -16632,6 +16635,11 @@ packages:
engines: {node: '>=8'}
dev: true
/type-fest@4.15.0:
resolution: {integrity: sha512-tB9lu0pQpX5KJq54g+oHOLumOx+pMep4RaM6liXh2PKmVRFF+/vAtUP0ZaJ0kOySfVNjF6doBWPHhBhISKdlIA==}
engines: {node: '>=16'}
dev: true
/type-is@1.6.18:
resolution: {integrity: sha512-TkRKr9sUTxEH8MdfuCSP7VizJyzRNMjj2J2do2Jr3Kym598JVdEksuzPQCnlFPW4ky9Q+iA+ma9BGm06XQBy8g==}
engines: {node: '>= 0.6'}